diff --git a/Cargo.lock b/Cargo.lock index 16b20f7ab..33aef7bd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1216,6 +1216,7 @@ dependencies = [ "floresta-common", "floresta-compact-filters", "floresta-mempool", + "hintsfile", "metrics", "rand 0.9.4", "rustls", @@ -1505,6 +1506,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3011d1213f159867b13cfd6ac92d2cd5f1345762c63be3554e84092d85a50bbd" +[[package]] +name = "hintsfile" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "554a58f64ca02ec89bb21ae10f675092647b94c7f8c846efef11c28633b5092b" + [[package]] name = "http" version = "1.4.0" @@ -2623,8 +2630,7 @@ dependencies = [ [[package]] name = "rustreexo" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8353cd48bea30340eced2a11770e47bb6b83f0e7e679742301f3332e6ec1f6ab" +source = "git+https://github.com/Davidson-Souza/rustreexo.git?branch=swift-sync#d86c31981f42344d739000eb1c2a3f6d09c9b30d" dependencies = [ "bitcoin-io 0.3.0", "bitcoin_hashes 0.20.0", diff --git a/Cargo.toml b/Cargo.toml index c7f3cbbb2..96e794e5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,3 +92,6 @@ opt-level = 3 # maximum optimization level strip = true # strip symbol tables and metadata lto = true # enable link-time optimizations codegen-units = 1 # compile a single codegen unit + +[patch.crates-io] +rustreexo = { git = "https://github.com/Davidson-Souza/rustreexo.git", branch = "swift-sync" } diff --git a/bin/florestad/src/main.rs b/bin/florestad/src/main.rs index 66280b3c4..32d2b8300 100644 --- a/bin/florestad/src/main.rs +++ b/bin/florestad/src/main.rs @@ -122,7 +122,6 @@ fn main() { let _rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(4) - .max_blocking_threads(2) .thread_keep_alive(Duration::from_secs(60)) .thread_name("florestad") .build() diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index b50838180..687431a3a 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -1256,23 +1256,29 @@ impl UpdatableChainstate for ChainState Result { - let mut curr_header = self.get_disk_block_header(&assumed_hash)?; + let assumed_header = self.get_disk_block_header(&assumed_hash)?; - while let Ok(header) = self.get_disk_block_header(&curr_header.block_hash()) { + let mut header = assumed_header; + let mut hash = assumed_hash; + loop { if self.is_genesis(&header) { break; } let height = header.try_height()?; - self.update_header(&DiskBlockHeader::FullyValid(*header, height))?; - curr_header = self.get_ancestor(&header)?; - } + self.update_header_and_index( + &DiskBlockHeader::FullyValid(*header, height), + hash, + height, + )?; - self.update_view(curr_header.try_height()?, &curr_header, acc.clone())?; + // Move to the previous block + header = self.get_ancestor(&header)?; + hash = header.block_hash(); + } - let mut guard = write_lock!(self); - guard.best_block.validation_index = assumed_hash; - guard.acc = acc; + // Update the tip and accumulator data with our assumed tip + self.update_view(assumed_header.try_height()?, &assumed_header, acc)?; Ok(true) } diff --git a/crates/floresta-chain/src/pruned_utreexo/consensus.rs b/crates/floresta-chain/src/pruned_utreexo/consensus.rs index ac25939fc..e206c23a6 100644 --- a/crates/floresta-chain/src/pruned_utreexo/consensus.rs +++ b/crates/floresta-chain/src/pruned_utreexo/consensus.rs @@ -569,10 +569,25 @@ impl Consensus { height: u32, unspent_indexes: HashSet, salt: &SipHashKeys, - ) -> Result<(SwiftSyncAgg, Amount), BlockchainError> { + ) -> Result<(SwiftSyncAgg, Amount, Vec), BlockchainError> { let txids = self.check_block(block, height)?; - Consensus::verify_block_transactions_swiftsync(height, block, txids, unspent_indexes, salt) + let utreexo_adds = udata::proof_util::get_block_adds_with_hints( + block, + &txids, + height, + block.block_hash(), + &unspent_indexes, + ); + let (agg, amount) = Consensus::verify_block_transactions_swiftsync( + height, + block, + txids, + unspent_indexes, + salt, + )?; + + Ok((agg, amount, utreexo_adds)) } /// Returns the TxOut being spent by the given input. @@ -729,7 +744,7 @@ impl Consensus { let adds = udata::proof_util::get_block_adds(block, height, block_hash); // Update the accumulator - let acc = acc.modify(&adds, &del_hashes, &proof)?.0; + let acc = acc.modify(&adds, &del_hashes, &proof)?; Ok(acc) } @@ -1566,7 +1581,7 @@ mod tests { // We add the only TxOut in this block to the aggregator (spent later). 9 => { let unspent_indexes = HashSet::new(); - let (agg_blk_9, amount) = consensus + let (agg_blk_9, amount, _) = consensus .process_block_swiftsync(block, 9, unspent_indexes, &salt) .unwrap(); @@ -1580,7 +1595,7 @@ mod tests { // This block spends the TxOut that was added to the aggregator in block 9. 170 => { let unspent_indexes = HashSet::from_iter(vec![0, 1, 2]); - let (agg_blk_170, amount) = consensus + let (agg_blk_170, amount, _) = consensus .process_block_swiftsync(block, 170, unspent_indexes, &salt) .unwrap(); @@ -1593,7 +1608,7 @@ mod tests { } i => { let unspent_indexes = default_unspent_idx.clone(); - let (agg_i, amount) = consensus + let (agg_i, amount, _) = consensus .process_block_swiftsync(block, i as u32, unspent_indexes, &salt) .unwrap(); @@ -1626,10 +1641,10 @@ mod tests { let block_9 = &mainnet_blocks[9]; let block_170 = &mainnet_blocks[170]; - let (agg_9, _) = consensus + let (agg_9, _, _) = consensus .process_block_swiftsync(block_9, 9, HashSet::new(), &salt) .unwrap(); - let (agg_170, _) = consensus + let (agg_170, _, _) = consensus .process_block_swiftsync(block_170, 170, HashSet::from_iter(vec![0, 1, 2]), &salt) .unwrap(); diff --git a/crates/floresta-chain/src/pruned_utreexo/udata.rs b/crates/floresta-chain/src/pruned_utreexo/udata.rs index 616f51adb..081474c08 100644 --- a/crates/floresta-chain/src/pruned_utreexo/udata.rs +++ b/crates/floresta-chain/src/pruned_utreexo/udata.rs @@ -212,6 +212,7 @@ pub mod proof_util { use crate::BlockchainError; use crate::CompactLeafData; use crate::ScriptPubKeyKind; + use crate::extensions::Bip30UnspendableExt; use crate::prelude::*; use crate::pruned_utreexo::consensus::Consensus; use crate::pruned_utreexo::consensus::UTREEXO_TAG_V1; @@ -348,6 +349,14 @@ pub mod proof_util { sha256::Hash::from_byte_array(leaf_hash.into()) } + fn get_input_prevouts(txdata: &[Transaction]) -> HashSet { + txdata + .iter() + .flat_map(|tx| tx.input.iter()) + .map(|i| i.previous_output) + .collect() + } + /// From a block, gets the roots that will be included on the acc, certifying /// that any utxo will not be spent in the same block. pub fn get_block_adds( @@ -357,12 +366,7 @@ pub mod proof_util { ) -> Vec { // Get inputs from the block, we'll need this HashSet to check if an output is spent // in the same block. If it is, we don't need to add it to the accumulator. - let mut spent = HashSet::new(); - for tx in &block.txdata { - for input in &tx.input { - spent.insert((input.previous_output.txid, input.previous_output.vout)); - } - } + let spent = get_input_prevouts(&block.txdata); // Get all leaf hashes that will be added to the accumulator let mut adds = Vec::new(); @@ -371,7 +375,7 @@ pub mod proof_util { let is_cb = tx.is_coinbase(); for (vout, output) in tx.output.iter().enumerate() { - let utxo_id = (txid, vout as u32); + let utxo_id = OutPoint::new(txid, vout as u32); if Consensus::is_unspendable(&output.script_pubkey) || spent.contains(&utxo_id) { // Do not add unspendable nor already spent utxos @@ -387,6 +391,64 @@ pub mod proof_util { adds } + pub fn get_block_adds_with_hints( + block: &Block, + txids: &[Txid], + height: u32, + block_hash: BlockHash, + unspent_indexes: &HashSet, + ) -> Vec { + let transactions = &block.txdata; + assert_eq!(transactions.len(), txids.len()); + + let mut output_index = 0; + + // Get inputs from the block, we'll need this HashSet to check if an output is spent + // in the same block. If it is, we don't need to add it to the accumulator. + let spent = get_input_prevouts(transactions); + + // Get all leaf hashes that will be added to the accumulator + let mut adds = Vec::new(); + for (tx, txid) in transactions.iter().zip(txids) { + let is_cb = tx.is_coinbase(); + + for (vout, output) in tx.output.iter().enumerate() { + // Special case: unspendable outputs do not count for the block `output_index` + if Consensus::is_unspendable(&output.script_pubkey) { + continue; + } + + let utxo_id = OutPoint::new(*txid, vout as u32); + if spent.contains(&utxo_id) { + output_index += 1; + // Do not add UTXOs spent in the same block + continue; + } + + let is_bip30_unspendable = is_cb && block.is_bip30_unspendable(height); + + if is_bip30_unspendable || unspent_indexes.contains(&output_index) { + // Add unspent outputs to the accumulator, according to the hints + let leaf_hash = + get_leaf_hashes(*txid, is_cb, vout as u32, output, height, block_hash); + adds.push(BitcoinNodeHash::Some(leaf_hash.to_byte_array())); + } else { + // Hinted as spent: add empty leaf hash + adds.push(BitcoinNodeHash::Empty); + } + + // BIP-30 unspendable outputs do not count for the block `output_index` + if is_bip30_unspendable { + continue; + } + + output_index += 1; + } + } + + adds + } + /// A hash map that provides the UTXO data given the outpoint. We will get this data /// from either our own cache or the Utreexo proofs, and use it to validate blocks /// and transactions. diff --git a/crates/floresta-wire/Cargo.toml b/crates/floresta-wire/Cargo.toml index 9c0b34d64..583a17797 100644 --- a/crates/floresta-wire/Cargo.toml +++ b/crates/floresta-wire/Cargo.toml @@ -20,6 +20,7 @@ categories = ["cryptography::cryptocurrencies", "network-programming"] bip324 = { version = "=0.10.0", features = [ "tokio" ] } bitcoin = { workspace = true } dns-lookup = { workspace = true } +hintsfile = "0.1.1" rand = { workspace = true } rustls = { version = "=0.23.40", default-features = false, features = ["ring", "std", "tls12"] } rustreexo = { workspace = true } diff --git a/crates/floresta-wire/src/p2p_wire/mod.rs b/crates/floresta-wire/src/p2p_wire/mod.rs index 747ccf5a7..e7a96df51 100644 --- a/crates/floresta-wire/src/p2p_wire/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/mod.rs @@ -96,6 +96,7 @@ pub mod node_context; pub mod node_interface; pub mod peer; pub mod socks; +mod stump_updater; #[cfg(test)] #[doc(hidden)] pub mod tests; diff --git a/crates/floresta-wire/src/p2p_wire/node/blocks.rs b/crates/floresta-wire/src/p2p_wire/node/blocks.rs index a6fab9285..f2e4d0d66 100644 --- a/crates/floresta-wire/src/p2p_wire/node/blocks.rs +++ b/crates/floresta-wire/src/p2p_wire/node/blocks.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 +use std::sync::Arc; use std::time::Instant; use bitcoin::Block; @@ -43,12 +44,15 @@ pub(crate) struct InflightBlock { pub peer: PeerId, /// The block itself. - pub block: Block, + pub block: Arc, /// Auxiliary data needed for validating this block. Currently, it includes utreexo /// leaf data (previous UTXOs spent in the block), the corresponding accumulator /// inclusion proof, and the peer id that provided them. pub aux_data: Option, + + /// If this block is currently being processed in a worker, this is the start time. + pub processing_since: Option, } impl InflightBlock { @@ -57,7 +61,7 @@ impl InflightBlock { /// If the block doesn't spend any output (i.e., coinbase transaction only) this method adds /// empty auxiliary data, which marks this inflight block as ready to process. Blocks with /// transactions require [`UtreexoData`] (see [`InflightBlock::add_utreexo_data`]). - fn new(block: Block, peer: PeerId) -> Self { + fn new(block: Arc, peer: PeerId) -> Self { let aux_data = match block.txdata.len() { 1 => Some((Vec::new(), Proof::default(), peer)), _ => None, // we need auxiliary data for the txs @@ -67,6 +71,7 @@ impl InflightBlock { peer, block, aux_data, + processing_since: None, } } @@ -82,6 +87,29 @@ where Chain: ChainBackend + 'static, WireError: From, { + /// Returns `true` only if we can request `BLOCKS_PER_GETDATA` without exceeding the maximum + /// unprocessed blocks allowed. + pub(crate) fn can_request_more_blocks(&self) -> bool { + let max_inflight_blocks = T::BLOCKS_PER_GETDATA * T::MAX_CONCURRENT_GETDATA; + + // If we do a GETDATA request, this will be the new unprocessed count + let next_unprocessed = self.unprocessed_blocks() + T::BLOCKS_PER_GETDATA; + + next_unprocessed <= max_inflight_blocks + } + + /// Returns the number of blocks awaiting processing (in memory or requested). + pub(crate) fn unprocessed_blocks(&self) -> usize { + let blocks_in_mem = self.blocks.len(); + let requested_blocks = self + .inflight + .keys() + .filter(|inflight| matches!(inflight, InflightRequests::Blocks(_))) + .count(); + + blocks_in_mem + requested_blocks + } + pub(crate) fn request_blocks(&mut self, blocks: Vec) -> Result<(), WireError> { let should_request = |block: &BlockHash| { let is_inflight = self @@ -98,8 +126,8 @@ where return Ok(()); } - let peer = - self.send_to_fast_peer(NodeRequest::GetBlock(blocks.clone()), ServiceFlags::NETWORK)?; + let block_req = NodeRequest::GetBlock(blocks.clone(), self.witnessless); + let peer = self.send_to_fast_peer(block_req, ServiceFlags::NETWORK)?; for block in blocks.iter() { self.inflight @@ -126,7 +154,7 @@ where debug!("Received block {block_hash} from peer {peer}, with {txdata_len} txs"); self.blocks - .insert(block_hash, InflightBlock::new(block, peer)); + .insert(block_hash, InflightBlock::new(Arc::new(block), peer)); // We only need auxiliary utreexo data if there are non-coinbase transactions if txdata_len != 1 { @@ -311,7 +339,7 @@ where } /// Returns the inner [`BlockValidationErrors`] of this chain error, if any. - fn block_validation_err(e: BlockchainError) -> Option { + pub(crate) fn block_validation_err(e: BlockchainError) -> Option { match e { BlockchainError::TransactionError(tx_err) => Some(tx_err.error), BlockchainError::BlockValidation(block_err) => Some(block_err), @@ -329,7 +357,7 @@ where fn handle_validation_errors( &mut self, e: BlockValidationErrors, - block: Block, + block: Arc, block_peer: PeerId, utreexo_peer: PeerId, ) -> Option { diff --git a/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs index b5d4bd86c..6666d2aeb 100644 --- a/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs @@ -46,6 +46,7 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -438,7 +439,7 @@ where peer: PeerId, block_hash: BlockHash, ) -> Result { - self.send_to_peer(peer, NodeRequest::GetBlock(vec![block_hash]))?; + self.send_to_peer(peer, NodeRequest::GetBlock(vec![block_hash], false))?; let timeout = Instant::now() + Duration::from_secs(60); let mut block = None; @@ -502,8 +503,9 @@ where return Ok(InflightBlock { peer, - block, + block: Arc::new(block), aux_data: Some((uproof.leaf_data, proof, peer)), + processing_since: None, }); } _ => {} @@ -935,6 +937,10 @@ where NodeNotification::FromUser(request, responder) => { self.perform_user_request(request, responder).await; } + + NodeNotification::FromWorker(msg) => { + error!("Received a notification from the worker thread {msg:?}"); + } } } @@ -982,6 +988,10 @@ where NodeNotification::DnsSeedAddresses(addresses) => { self.address_man.push_addresses(&addresses); } + + NodeNotification::FromWorker(msg) => { + error!("Received a notification from the worker thread {msg:?}"); + } } Ok(()) } diff --git a/crates/floresta-wire/src/p2p_wire/node/mod.rs b/crates/floresta-wire/src/p2p_wire/node/mod.rs index 0933d3097..4000617d4 100644 --- a/crates/floresta-wire/src/p2p_wire/node/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/node/mod.rs @@ -9,6 +9,7 @@ pub mod chain_selector_ctx; mod conn; mod peer_man; pub mod running_ctx; +pub mod swift_sync_ctx; pub mod sync_ctx; mod user_req; @@ -22,13 +23,16 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use bitcoin::Amount; use bitcoin::BlockHash; use bitcoin::Network; use bitcoin::Txid; use bitcoin::p2p::ServiceFlags; use bitcoin::p2p::address::AddrV2Message; pub(crate) use blocks::InflightBlock; +use floresta_chain::BlockchainError; use floresta_chain::ChainBackend; +use floresta_chain::swift_sync_agg::SwiftSyncAgg; use floresta_common::Ema; use floresta_common::try_and_log; use floresta_common::try_and_warn; @@ -37,6 +41,7 @@ use floresta_compact_filters::network_filters::NetworkFilters; use floresta_mempool::Mempool; pub use peer_man::AddedPeerInfo; use running_ctx::RunningNode; +use rustreexo::node_hash::BitcoinNodeHash; use serde::Deserialize; use serde::Serialize; use tokio::sync::Mutex; @@ -62,18 +67,23 @@ use crate::node_context::PeerId; /// As per BIP 155, limit the number of addresses to 1,000 pub const MAX_ADDRV2_ADDRESSES: usize = 1_000; +type WorkerResult = Result<(SwiftSyncAgg, Amount, Vec), BlockchainError>; + #[derive(Debug)] pub enum NodeNotification { DnsSeedAddresses(Vec), FromPeer(u32, PeerMessages, Instant), FromUser(UserRequest, oneshot::Sender), + /// Returns the validation result with the delta SwiftSync aggregator and the total unspent + /// amount sum, together with the block hash and height. + FromWorker((WorkerResult, BlockHash, u32)), } #[derive(Debug, Clone, PartialEq, Hash)] /// Sent from node to peers, usually to request something pub enum NodeRequest { /// Request the full block data for one or more blocks - GetBlock(Vec), + GetBlock(Vec, bool), /// Asks peer for headers GetHeaders(Vec), @@ -294,6 +304,7 @@ pub struct NodeCommon { pub(crate) datadir: PathBuf, pub(crate) network: Network, pub(crate) kill_signal: Arc>, + pub(crate) witnessless: bool, } /// The main node that operates while florestad is up. @@ -395,6 +406,7 @@ where config, kill_signal, added_peers: Vec::new(), + witnessless: false, }, context: T::default(), }) diff --git a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs index fc0352ac9..046a95fa4 100644 --- a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs @@ -39,6 +39,7 @@ use crate::node::NodeRequest; use crate::node::UtreexoNode; use crate::node::chain_selector_ctx::ChainSelector; use crate::node::periodic_job; +use crate::node::swift_sync_ctx::SwiftSync; use crate::node::sync_ctx::SyncNode; use crate::node_context::LoopControl; use crate::node_context::NodeContext; @@ -122,11 +123,28 @@ where /// proofs, this means the last 100 blocks, and for assumeutreexo, this means however many /// blocks from the hard-coded value in the config file. pub async fn catch_up(self) -> Result { - let sync = UtreexoNode { + let swift_sync = UtreexoNode { common: self.common, + context: SwiftSync::default(), + }; + + let swift_sync = swift_sync.run(|_| {}).await; + let swift_sync_failed = swift_sync.was_aborted(); + + // Finish IBD with regular utreexo sync + let mut sync = UtreexoNode { + common: swift_sync.common, context: SyncNode::default(), }; + // If SwiftSync couldn't complete, we need to validate all blocks from scratch + if swift_sync_failed { + // Clear the inflight requests and in-memory blocks to start from genesis + sync.inflight.clear(); + sync.blocks.clear(); + assert_eq!(sync.unprocessed_blocks(), 0); + } + let sync = sync.run(|_| {}).await; Ok(UtreexoNode { @@ -769,7 +787,7 @@ where self.send_to_peer( peer, - NodeRequest::GetBlock(vec![header.block_hash()]), + NodeRequest::GetBlock(vec![header.block_hash()], false), )?; self.inflight.insert( @@ -832,6 +850,10 @@ where ), } } + + NodeNotification::FromWorker(msg) => { + error!("Received a notification from the worker thread {msg:?}"); + } } Ok(()) } diff --git a/crates/floresta-wire/src/p2p_wire/node/swift_sync_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/swift_sync_ctx.rs new file mode 100644 index 000000000..505569beb --- /dev/null +++ b/crates/floresta-wire/src/p2p_wire/node/swift_sync_ctx.rs @@ -0,0 +1,688 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! A node that downloads and validates the blockchain, but skips utreexo proofs as they aren't +//! needed to validate the UTXO set with the SwiftSync method. + +use std::collections::HashSet; +use std::fs::File; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use bitcoin::Amount; +use bitcoin::BlockHash; +use bitcoin::Network; +use bitcoin::block::Header as BlockHeader; +use bitcoin::p2p::ServiceFlags; +use floresta_chain::BlockValidationErrors; +use floresta_chain::BlockchainError; +use floresta_chain::ThreadSafeChain; +use floresta_chain::pruned_utreexo::consensus::Consensus; +use floresta_chain::swift_sync_agg::SipHashKeys; +use floresta_chain::swift_sync_agg::SwiftSyncAgg; +use floresta_common::service_flags; +use hintsfile::Hintsfile; +use rand::RngCore; +use rustreexo::node_hash::BitcoinNodeHash; +use rustreexo::proof::Proof; +use rustreexo::stump::Stump; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::debug; +use tracing::error; +use tracing::info; +use tracing::warn; + +use crate::node::ConnectionKind; +use crate::node::InflightBlock; +use crate::node::InflightRequests; +use crate::node::NodeNotification; +use crate::node::UtreexoNode; +use crate::node::WorkerResult; +use crate::node::oneshot::error::TryRecvError; +use crate::node::periodic_job; +use crate::node::try_and_log; +use crate::node_context::LoopControl; +use crate::node_context::NodeContext; +use crate::node_context::PeerId; +use crate::p2p_wire::error::WireError; +use crate::p2p_wire::peer::PeerMessages; +use crate::p2p_wire::stump_updater::StumpUpdate; +use crate::p2p_wire::stump_updater::StumpUpdater; +use crate::p2p_wire::stump_updater::StumpUpdaterHandle; + +/// [`SwiftSync`] is a node that downloads and validates the blockchain but skips utreexo +/// proofs by using SwiftSync. +/// +/// This node implements: +/// - `NodeContext` +/// - `UtreexoNode` +#[derive(Default)] +pub struct SwiftSync { + stump_updater: Option, + + /// The `TxOut` aggregator. + agg: SwiftSyncAgg, + + /// The secret salt used to compute the aggregator element hashes. + salt: Arc, + + /// The total unspent amount. Once we reach the SwiftSync stop height, this must be less or + /// equal than the theoretical supply limit at that height. + supply: Amount, + + /// The target height for the currently used SwiftSync hints. + stop_height: u32, + + /// Height at which SwiftSync was aborted, if any. + /// + /// We abort when either the hints are found to be invalid or the current chain is invalid (we + /// may find an invalid block or, at the end, a violation of the maximum supply limit). + abort_height: Option, +} + +impl NodeContext for SwiftSync { + fn get_required_services(&self) -> bitcoin::p2p::ServiceFlags { + ServiceFlags::WITNESS | service_flags::UTREEXO.into() | ServiceFlags::NETWORK + } + + const TRY_NEW_CONNECTION: u64 = 15; // We want to be well-connected early on + const REQUEST_TIMEOUT: u64 = 2 * 60; // 2 minutes (5 blocks should reach us much faster) + const MAX_INFLIGHT_REQUESTS: usize = 100; // double the default + const MAX_OUTGOING_PEERS: usize = 30; + const MAX_CONCURRENT_GETDATA: usize = 40; // 40 * 5 = 200 blocks in parallel + const ASSUME_STALE: u64 = 2 * 60; // Two minutes without blocks while in IBD is very suspicious + + // A more conservative value than the default of 1 second, since we'll have many peer messages + const MAINTENANCE_TICK: Duration = Duration::from_secs(5); +} + +// This is more than enough to avoid CPU from ever becoming a bottleneck +const MAX_PARALLEL_WORKERS: usize = 6; + +/// Node methods for a [`UtreexoNode`] where its Context is [`SwiftSync`]. +/// See [node](crates/floresta-wire/src/p2p_wire/node.rs) for more information. +impl UtreexoNode +where + Chain: ThreadSafeChain, + WireError: From, +{ + /// Parses the SwiftSync hints file and returns an in-memory [`Hintsfile`] representation. + fn parse_hints_file(datadir: impl AsRef, network: Network) -> Option { + let path = datadir.as_ref().join(format!("{network}.hints")); + + let mut file = File::open(path).ok()?; + Some(Hintsfile::from_reader(&mut file).expect("couldn't read hints file")) + } + + /// Generates a random salt for this SwiftSync session. + fn generate_salt() -> Arc { + let mut rng = rand::rng(); + + Arc::new(SipHashKeys::new( + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + )) + } + + /// Returns `true` if SwiftSync failed, due to the hints being invalid or the current chain + /// being invalid (below the SwiftSync stop height). + pub(crate) fn was_aborted(&self) -> bool { + self.context.abort_height.is_some() + } + + /// Computes the next blocks to request, and sends a GETDATA request, advancing + /// `last_block_request` up to the SwiftSync hints `stop_height`. + fn get_blocks_to_download(&mut self) { + // If this request would make our inflight queue too long, postpone it + if !self.can_request_more_blocks() || self.was_aborted() { + return; + } + + let prev_last_request = self.last_block_request; + let mut blocks = Vec::with_capacity(SwiftSync::BLOCKS_PER_GETDATA); + + for _ in 0..SwiftSync::BLOCKS_PER_GETDATA { + let next_height = self.last_block_request + 1; + + if next_height > self.context.stop_height { + // We need to reach it but not exceed it + break; + } + + let Ok(next_block) = self.chain.get_block_hash(next_height) else { + // Likely end of chain (e.g., `BlockNotPresent`) + break; + }; + + blocks.push(next_block); + self.last_block_request += 1; + } + + if let Err(e) = self.request_blocks(blocks) { + // Rollback so we can retry the same heights next time. + error!("Failed to request blocks: {e:?}"); + self.last_block_request = prev_last_request; + } + // If `request_blocks` succeeds, we will keep track of the requests in `self.inflight`, + // so even if the remote peer disconnects, we can still re-request them. + } + + /// Starts SwiftSync processing for up to `MAX_PARALLEL_WORKERS` pending blocks. + fn pump_swiftsync(&mut self, hints: &mut Hintsfile) -> Result<(), WireError> { + let processing = self + .blocks + .values() + .filter(|b| b.processing_since.is_some()) + .count(); + + let free = MAX_PARALLEL_WORKERS.saturating_sub(processing); + if free == 0 { + return Ok(()); + } + + // Collect hashes first (can't mutate the map while iterating it) + let to_process: Vec = self + .blocks + .iter() + .filter(|(_, b)| b.processing_since.is_none()) + .take(free) // We don't exceed MAX_PARALLEL_WORKERS + .map(|(h, _)| *h) + .collect(); + + for hash in to_process { + // Prefer storing height in the entry to avoid repeated chain lookups + let height = self + .chain + .get_block_height(&hash)? + // NOTE: if a previous block was invalid, we will get this error + .ok_or(BlockchainError::OrphanOrInvalidBlock)?; + + self.start_processing_swiftsync(hash, height, hints)?; + } + + Ok(()) + } + + /// Spawns a blocking task to process a block with the provided SwiftSync hints. + fn start_processing_swiftsync( + &mut self, + block_hash: BlockHash, + block_height: u32, + hints: &mut Hintsfile, + ) -> Result<(), WireError> { + debug!("processing block {block_hash}"); + let entry = self + .blocks + .get_mut(&block_hash) + .ok_or(WireError::BlockNotFound)?; + + if entry.processing_since.is_some() { + return Ok(()); // already being processed + } + + let Some(block_hints) = hints.take_indices(block_height) else { + error!("We tried processing block {block_height} but its hints are missing"); + return Ok(()); + }; + let unspent_indexes: HashSet = block_hints.into_iter().collect(); + + // Start the processing timer + entry.processing_since = Some(Instant::now()); + + let block = Arc::clone(&entry.block); + let consensus = Consensus::from(self.network); + let salt = Arc::clone(&self.context.salt); + + // If we find a very cheap block (e.g., ~10μs), it's faster to process it directly + if block.txdata.len() == 1 { + let result = + consensus.process_block_swiftsync(&block, block_height, unspent_indexes, &salt); + + self.handle_worker_notification(result, block_hash, block_height, hints)?; + return Ok(()); + } + + let node_sender = self.node_tx.clone(); + tokio::task::spawn_blocking(move || { + let result = + consensus.process_block_swiftsync(&block, block_height, unspent_indexes, &salt); + + let notification = NodeNotification::FromWorker((result, block_hash, block_height)); + let _ = node_sender.send(notification); + }); + + Ok(()) + } + + /// Starts the SwiftSync node by updating the last block requested and starting the main loop. + /// This loop to the following tasks, in order: + /// - Receives messages from our peers through the node_tx channel, and handles them. + /// - Checks if the kill signal is set, and if so breaks the loop. + /// - Checks if we have downloaded and processed all blocks, and verifies that the aggregator + /// is zero. If so, we are done. + /// - Checks if our last validation update was long ago and creates an extra connection. + /// - Handles timeouts for inflight requests. + /// - If we are low on inflights, requests new blocks to validate. + pub async fn run(mut self, done_cb: impl FnOnce(&Chain)) -> Self { + let Some(mut hints) = Self::parse_hints_file(&self.datadir, self.network) else { + return self; + }; + + let validation_idx = self.chain.get_validation_index().unwrap(); + if validation_idx >= hints.stop_height() { + return self; + } + + self.witnessless = true; // enable witnessless sync + self.context.stop_height = hints.stop_height(); + + assert_eq!( + validation_idx, 0, + "Validation index should be 0 at the start of SwiftSync" + ); + self.last_block_request = 0; + + // Initialize the accumulator updater task that will work in parallel to block validation + self.context.stump_updater = + Some(StumpUpdater::spawn(Stump::new(), 0, hints.stop_height())); + + // Generate the random salt and kick off SwiftSync! + self.context.salt = Self::generate_salt(); + + info!("Performing SwiftSync up to height {}", hints.stop_height()); + + let mut ticker = time::interval(SwiftSync::MAINTENANCE_TICK); + // If we fall behind, don't "catch up" by running maintenance repeatedly + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + + // Maintenance runs only on tick but has priority + _ = ticker.tick() => match self.maintenance_tick(&mut hints).await { + LoopControl::Continue => {}, + LoopControl::Break => break, + }, + + // Handle messages as soon as we find any, otherwise sleep until maintenance + msg = self.node_rx.recv() => { + let Some(msg) = msg else { + break; + }; + // We only update the aggregator when reading responses from the workers + try_and_log!(self.handle_message(msg, &mut hints).await); + + // Drain all queued messages + while let Ok(msg) = self.node_rx.try_recv() { + try_and_log!(self.handle_message(msg, &mut hints).await); + } + if *self.kill_signal.read().await { + break; + } + } + } + } + + done_cb(&self.chain); + self + } + + /// Performs the periodic maintenance tasks, including checking for the cancel signal, peer + /// connections, and inflight request timeouts. + /// + /// Returns `LoopControl::Break` if we need to break the main `SwiftSync` loop, which may + /// happen if the kill signal was set, we successfully finished SwiftSync, or we need to abort + /// operation due to a validation error. + async fn maintenance_tick(&mut self, hints: &mut Hintsfile) -> LoopControl { + if *self.kill_signal.read().await { + return LoopControl::Break; + } + + if let Some(invalid_h) = self.context.abort_height { + // All our progress is lost since the hints refer to an invalid chain, and we don't + // know if the current UTXO set is correct. We need to start from genesis. + error!("Aborting SwiftSync: the most PoW chain is invalid at height {invalid_h}"); + return LoopControl::Break; + } + + // If we have reached the SwiftSync stop height, and we have added all the utreexo leaves + // to the accumulator, we have finished. + if let Some(final_acc) = self.swift_sync_finished() { + self.handle_stop_height_reached(final_acc); + return LoopControl::Break; + } + + // Checks if we need to open a new connection + periodic_job!( + self.last_connection => self.maybe_open_connection(ServiceFlags::NETWORK), + SwiftSync::TRY_NEW_CONNECTION, + ); + + // Open new feeler connection periodically + periodic_job!( + self.last_feeler => self.open_feeler_connection(), + SwiftSync::FEELER_INTERVAL, + ); + + // Re-request blocks that haven't arrived in `SwiftSync::REQUEST_TIMEOUT` seconds + try_and_log!(self.check_for_timeout()); + + let assume_stale = Instant::now() + .duration_since(self.common.last_tip_update) + .as_secs() + > SwiftSync::ASSUME_STALE; + + if assume_stale { + try_and_log!(self.create_connection(ConnectionKind::Extra)); + self.last_tip_update = Instant::now(); + return LoopControl::Continue; + } + + try_and_log!(self.pump_swiftsync(hints)); + + self.get_blocks_to_download(); + LoopControl::Continue + } + + /// Returns true if we have requested all blocks up to the stop height, we have received and + /// processed all of them, and we have added all utreexo leaves to the accumulator. + fn swift_sync_finished(&mut self) -> Option { + let requesting_blocks = self.last_block_request != self.context.stop_height; + + // We are still requesting or processing blocks + if requesting_blocks || self.unprocessed_blocks() != 0 { + return None; + } + + // Try to get the result from the stump builder task, else keep waiting for it to finish + let updater = self.context.stump_updater.as_mut().expect("initialized"); + + match updater.done.try_recv() { + Ok(Ok(stump)) => Some(stump), + + // All blocks have been processed, but the stump builder task hasn't finished yet + Err(TryRecvError::Empty) => None, + + // These two cases should never happen! Proofless utreexo addition should never fail + Err(TryRecvError::Closed) => panic!("Stump builder task was closed without result"), + Ok(Err(e)) => panic!("Stump builder task failed with error: {e:?}"), + } + } + + /// Called when we process the last SwiftSync block. Verifies that the produced aggregator is + /// zero and supply is correct. On success marks the chain assumed and exits IBD. + /// + /// If one of the two invariants fails, it sets the `abort_height` field. + fn handle_stop_height_reached(&mut self, final_acc: Stump) { + let stop_height = self.context.stop_height; + let final_agg = self.context.agg; + let final_supply = self.context.supply; + + // Disable witnessless mode, since we are done with SwiftSync. UtreexoSync requires the + // witness data in order to reconstruct P2WPKH and P2WSH outputs. + self.witnessless = false; + + if !final_agg.is_zero() { + error!("SwiftSync failed with the provided hints file; end aggregator is not zero"); + + self.context.abort_height = Some(stop_height); + return; + } + + let consensus = Consensus::from(self.network); + if final_supply > consensus.max_supply_at_height(stop_height) { + error!("Aborting SwiftSync: most PoW chain has excess supply ({final_supply})"); + + self.context.abort_height = Some(stop_height); + return; + } + + info!("SwiftSync is finished, switching to normal operation mode"); + let tip_hash = self.chain.get_block_hash(stop_height).unwrap(); + + info!("SwiftSync produced the following accumulator for {tip_hash}: \n{final_acc:?}"); + + self.chain + .mark_chain_as_assumed(final_acc, tip_hash) + .unwrap(); + self.chain.toggle_ibd(false); + } + + /// Process a message from a peer and handle it accordingly between the variants of [`PeerMessages`]. + async fn handle_message( + &mut self, + msg: NodeNotification, + hints: &mut Hintsfile, + ) -> Result<(), WireError> { + match msg { + NodeNotification::FromUser(request, responder) => { + self.perform_user_request(request, responder).await; + } + + NodeNotification::DnsSeedAddresses(addresses) => { + self.address_man.push_addresses(&addresses); + } + + NodeNotification::FromPeer(peer, notification, time) => { + self.register_message_time(¬ification, peer, time); + + let Some(unhandled) = self.handle_peer_msg_common(notification, peer)? else { + return Ok(()); + }; + + match unhandled { + PeerMessages::Block(block) => { + let hash = block.block_hash(); + if self.blocks.contains_key(&hash) { + debug!( + "Received block {hash} from peer {peer}, but we already have it" + ); + return Ok(()); + } + + let Some(_) = self.inflight.remove(&InflightRequests::Blocks(hash)) else { + warn!("Received block {hash}, but we didn't ask for it"); + self.increase_banscore(peer, 5)?; + + return Ok(()); + }; + + // Reply and return early if it's a user-requested block. Else continue handling it. + let Some(block) = self.check_is_user_block_and_reply(block)? else { + return Ok(()); + }; + + let inflight_block = InflightBlock { + peer, + block: Arc::new(block), + // Since this is AV-SwiftSync, we don't need proofs nor leaves (UTXOs) + // TODO: once we implement full validation we'll need the spent UTXOs + aux_data: None, + processing_since: None, + }; + self.blocks.insert(hash, inflight_block); + + self.pump_swiftsync(hints)?; + self.get_blocks_to_download(); + } + + PeerMessages::Ready(version) => { + try_and_log!(self.handle_peer_ready(peer, version)); + } + + PeerMessages::Disconnected(idx) => { + try_and_log!(self.handle_disconnection(peer, idx)); + } + + PeerMessages::UtreexoProof(_) => { + warn!( + "Utreexo proof received from peer {peer}, but we didn't ask (SwiftSync)" + ); + self.increase_banscore(peer, 5)?; + } + + _ => {} + } + } + + NodeNotification::FromWorker((result, block_hash, height)) => { + self.handle_worker_notification(result, block_hash, height, hints)?; + } + } + + Ok(()) + } + + fn handle_worker_notification( + &mut self, + result: WorkerResult, + block_hash: BlockHash, + height: u32, + hints: &mut Hintsfile, + ) -> Result<(), WireError> { + // This block has already been processed: open space for a new worker + let block = self + .blocks + .remove(&block_hash) + .ok_or(WireError::BlockNotFound)?; + + // Immediately replace the finished worker with a new one + self.pump_swiftsync(hints)?; + + match result { + Ok((agg_re, unspent_amount, utreexo_adds)) => { + self.context.agg += agg_re; + self.context.supply += unspent_amount; + self.pump_utreexo_adds(height, utreexo_adds); + + self.handle_valid_worker_block(block_hash, height, block); + } + Err(e) => { + let header = block.block.header; + self.handle_invalid_block(e, header, height, block.peer)?; + } + }; + Ok(()) + } + + /// Handles sending new utreexo leaves to add. This should only be called when we know the + /// stump builder task is running (i.e., when there are still blocks to process). + fn pump_utreexo_adds(&self, height: u32, adds: Vec) { + let updater = self.context.stump_updater.as_ref().expect("initialized"); + + // Since SwiftSync is proofless, with implicit deletion, we only add leaves + let update = StumpUpdate { + adds, + deletes: Vec::new(), + proof: Proof::default(), + }; + + updater + .tx + .send((height, update)) + .expect("addition-only doesn't fail (proofless), updater should be alive"); + } + + fn handle_invalid_block( + &mut self, + chain_err: BlockchainError, + header: BlockHeader, + height: u32, + peer: PeerId, + ) -> Result<(), WireError> { + error!("Invalid block {header:?} received by peer {peer} reason: {chain_err:?}"); + let block_hash = header.block_hash(); + + // Return early if the error is not from block validation (e.g., a database error) + let Some(e) = Self::block_validation_err(chain_err) else { + return Ok(()); + }; + + match e { + // Abort SwiftSync if the block is truly invalid + BlockValidationErrors::InvalidCoinbase(_) + | BlockValidationErrors::UtxoNotFound(_) + | BlockValidationErrors::ScriptValidationError(_) + | BlockValidationErrors::NullPrevOut + | BlockValidationErrors::EmptyInputs + | BlockValidationErrors::EmptyOutputs + | BlockValidationErrors::ScriptError + | BlockValidationErrors::BlockTooBig + | BlockValidationErrors::NotEnoughPow + | BlockValidationErrors::TooManyCoins + | BlockValidationErrors::NotEnoughMoney + | BlockValidationErrors::FirstTxIsNotCoinbase + | BlockValidationErrors::BadCoinbaseOutValue + | BlockValidationErrors::EmptyBlock + | BlockValidationErrors::BadBip34 + | BlockValidationErrors::BIP94TimeWarp + | BlockValidationErrors::UnspendableUTXO + | BlockValidationErrors::CoinbaseNotMatured => { + self.context.abort_height = Some(height); + try_and_log!(self.chain.invalidate_block(block_hash)); + } + + // This block's txdata doesn't match the txid or wtxid merkle root. This can be a + // mutated block, so we can't invalidate it since the original txdata may be valid. + BlockValidationErrors::BadMerkleRoot | BlockValidationErrors::BadWitnessCommitment => {} + + // No proofs involved in SwiftSync + BlockValidationErrors::InvalidUtreexoProof => {} + + BlockValidationErrors::BlockExtendsAnOrphanChain + | BlockValidationErrors::BlockDoesntExtendTip => { + // The SwiftSync blocks are from our best chain, so this should never happen. + error!("BUG: block {block_hash} from peer {peer} returned: {e:?}"); + return Ok(()); + } + } + + warn!("Block {block_hash} from peer {peer} is invalid, banning peer"); + self.disconnect_and_ban(peer)?; + + Err(WireError::PeerMisbehaving) + } + + /// This method is currently just about updating metrics, but may be changed to persist the + /// SwiftSync progress. + fn handle_valid_worker_block( + &mut self, + block_hash: BlockHash, + height: u32, + block: InflightBlock, + ) { + // TODO should we update header and block index (similar to `self.chain.update_view`)? + info!( + "SwiftSync block: block_hash={block_hash} height={height} tx_count={}", + block.block.txdata.len(), + ); + + // TODO should we flush on SwiftSync? + // TODO notify the block + self.last_tip_update = Instant::now(); + + // Update metrics + let elapsed = block + .processing_since + .expect("Block was processed, this field is `Some`") + .elapsed() + .as_secs_f64(); + + self.block_sync_avg.add(elapsed); + + #[cfg(feature = "metrics")] + { + use metrics::get_metrics; + + let avg = self.block_sync_avg.value().expect("at least one sample"); + let metrics = get_metrics(); + metrics.block_height.set(height.into()); + metrics.avg_block_processing_time.set(avg); + } + } +} diff --git a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs index 753c22a1f..3d2e547b9 100644 --- a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs @@ -15,10 +15,10 @@ use rand::seq::IteratorRandom; use tokio::time; use tokio::time::MissedTickBehavior; use tracing::debug; +use tracing::error; use tracing::info; use crate::node::ConnectionKind; -use crate::node::InflightRequests; use crate::node::NodeNotification; use crate::node::NodeRequest; use crate::node::UtreexoNode; @@ -77,44 +77,26 @@ where /// TODO: Be smarter when selecting peers to send, like taking in consideration /// already inflight blocks and latency. fn get_blocks_to_download(&mut self) { - let max_inflight_blocks = SyncNode::BLOCKS_PER_GETDATA * SyncNode::MAX_CONCURRENT_GETDATA; - let inflight_blocks = self - .inflight - .keys() - .filter(|inflight| matches!(inflight, InflightRequests::Blocks(_))) - .count(); - - let unprocessed_blocks = inflight_blocks + self.blocks.len(); - - // if we do a request, this will be the new inflight blocks count - let next_unprocessed_count = unprocessed_blocks + SyncNode::BLOCKS_PER_GETDATA; - - // if this request would make our inflight queue too long, postpone it - if next_unprocessed_count > max_inflight_blocks { + // If this request would make our inflight queue too long, postpone it + if !self.can_request_more_blocks() { return; } let mut blocks = Vec::with_capacity(SyncNode::BLOCKS_PER_GETDATA); for _ in 0..SyncNode::BLOCKS_PER_GETDATA { - let next_block = self.last_block_request + 1; + let next_height = self.last_block_request + 1; let validation_index = self.chain.get_validation_index().unwrap(); - if next_block <= validation_index { + if next_height <= validation_index { self.last_block_request = validation_index; } - let next_block = self.chain.get_block_hash(next_block); - match next_block { - Ok(next_block) => { - blocks.push(next_block); - self.last_block_request += 1; - } + let Ok(next_block) = self.chain.get_block_hash(next_height) else { + // Likely end of chain (e.g., `BlockNotPresent`) + break; + }; - Err(_) => { - // this is likely because we've reached the end of the chain - // and we've got a `BlockNotPresent` error. - break; - } - } + blocks.push(next_block); + self.last_block_request += 1; } try_and_log!(self.request_blocks(blocks)); @@ -343,6 +325,10 @@ where _ => {} } } + + NodeNotification::FromWorker(msg) => { + error!("Received a notification from the worker thread {msg:?}"); + } } Ok(()) diff --git a/crates/floresta-wire/src/p2p_wire/node/user_req.rs b/crates/floresta-wire/src/p2p_wire/node/user_req.rs index a286896d1..004eb7637 100644 --- a/crates/floresta-wire/src/p2p_wire/node/user_req.rs +++ b/crates/floresta-wire/src/p2p_wire/node/user_req.rs @@ -77,7 +77,7 @@ where return; } - UserRequest::Block(block_hash) => NodeRequest::GetBlock(vec![block_hash]), + UserRequest::Block(block_hash) => NodeRequest::GetBlock(vec![block_hash], false), UserRequest::UtreexoProof(block_hash) => { NodeRequest::GetBlockProof((block_hash, Bitmap::default(), Bitmap::default())) diff --git a/crates/floresta-wire/src/p2p_wire/peer.rs b/crates/floresta-wire/src/p2p_wire/peer.rs index 2183a3403..0a8e661a5 100644 --- a/crates/floresta-wire/src/p2p_wire/peer.rs +++ b/crates/floresta-wire/src/p2p_wire/peer.rs @@ -356,10 +356,13 @@ impl Peer { assert_eq!(self.state, State::Connected); debug!("Handling node request: {request:?}"); match request { - NodeRequest::GetBlock(block_hashes) => { + NodeRequest::GetBlock(block_hashes, witnessless) => { let inv = block_hashes .iter() - .map(|block| Inventory::WitnessBlock(*block)) + .map(|block| match witnessless { + false => Inventory::WitnessBlock(*block), + true => Inventory::Block(*block), + }) .collect(); let _ = self.write(NetworkMessage::GetData(inv)).await; diff --git a/crates/floresta-wire/src/p2p_wire/stump_updater.rs b/crates/floresta-wire/src/p2p_wire/stump_updater.rs new file mode 100644 index 000000000..1cb3672af --- /dev/null +++ b/crates/floresta-wire/src/p2p_wire/stump_updater.rs @@ -0,0 +1,228 @@ +use std::collections::BTreeMap; + +use rustreexo::node_hash::BitcoinNodeHash; +use rustreexo::proof::Proof; +use rustreexo::stump::StumpError; +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::rustreexo::stump::Stump; + +/// Pending additions, deletions, and proof for a single accumulator update. +pub struct StumpUpdate { + pub adds: Vec, + pub deletes: Vec, + pub proof: Proof, +} + +pub type StumpResult = Result; + +/// Handle for interacting with a running [`StumpUpdater`] task. +/// +/// The caller must send exactly one update for each height in `initial_height + 1..=stop_height`. +/// Sending stale, duplicate, or out-of-range heights, or dropping `tx` before `stop_height` is +/// reached, is invalid usage and will close `done` without a result. +pub struct StumpUpdaterHandle { + /// Sender side for feeding `(height, update_data)` into the updater task. + pub tx: mpsc::UnboundedSender<(u32, StumpUpdate)>, + + /// Receiver for the final accumulator at `stop_height`, or any early update error. + pub done: oneshot::Receiver, +} + +/// The `StumpUpdater` struct is responsible for managing the state and updates for an utreexo +/// [`Stump`] accumulator, applying updates sequentially. +/// +/// This type enables out-of-order block processing, since we decouple accumulator updates from +/// block processing. It will cache all the data needed to update the accumulator (adds, deletes, +/// proofs) and consume it sequentially. +/// +/// The channel will be used to send the final accumulator to the consumer, if successful, or to +/// notify accumulator update failures. +pub struct StumpUpdater { + /// The accumulator for `last_height`. + last_acc: Stump, + + /// The last height we have processed. This is always incremented by 1, iff we have the update + /// data for the next height. + last_height: u32, + + /// Pending additions, deletions, and proofs to apply to the accumulator, mapped to the height + /// at which they must be applied. + pending_updates: BTreeMap, +} + +impl StumpUpdater { + pub fn spawn(initial_acc: Stump, initial_height: u32, stop_height: u32) -> StumpUpdaterHandle { + assert!( + initial_height < stop_height, + "initial `StumpUpdater` height must be less than `stop_height`", + ); + + let (tx, rx) = mpsc::unbounded_channel(); + let (done_tx, done_rx) = oneshot::channel(); + + // Initial state and empty updates cache + let updater = Self { + last_acc: initial_acc, + last_height: initial_height, + pending_updates: BTreeMap::new(), + }; + + tokio::task::spawn_blocking(move || { + let result = updater.run(rx, stop_height); + let _ = done_tx.send(result); + }); + + StumpUpdaterHandle { tx, done: done_rx } + } + + /// Queues one future update, rejecting stale, out-of-range, or duplicate heights. + fn queue_update(&mut self, height: u32, update: StumpUpdate, stop_height: u32) { + let last_height = self.last_height; + + // Sanity check: we shouldn't receive updates for already-processed heights + if height <= last_height || height > stop_height { + panic!("got update height {height}, but last={last_height}, stop={stop_height}"); + } + + // When we insert the new pending update, it shouldn't be duplicated + if self.pending_updates.insert(height, update).is_some() { + panic!("duplicate update data at height {height}"); + } + } + + fn run( + mut self, + mut rx: mpsc::UnboundedReceiver<(u32, StumpUpdate)>, + stop_height: u32, + ) -> StumpResult { + while self.last_height < stop_height { + // Wait until a new state update arrives + let Some((height, update)) = rx.blocking_recv() else { + panic!( + "updater channel closed at height {} before {stop_height}", + self.last_height, + ) + }; + + self.queue_update(height, update, stop_height); + self.try_next()?; + } + + // If we exit the while loop, we have reached the stop height + Ok(self.last_acc) + } + + /// Loops over all pending updates that we can sequentially apply, consuming the data and + /// updating `last_acc` and `last_height`. + /// + /// Returns on the first missing update data that is next in the sequence, or on update errors. + fn try_next(&mut self) -> Result<(), StumpError> { + loop { + let next_height = self.last_height + 1; + + // Since `pending_updates` is ordered by height, the first entry is the only + // update that can advance the accumulator. If it is not `next_height`, + // there is a gap, so we must wait for more update data. + let StumpUpdate { + adds, + deletes, + proof, + } = match self.pending_updates.first_entry() { + Some(entry) if *entry.key() == next_height => entry.remove(), + _ => break, + }; + + self.last_acc = self.last_acc.modify(&adds, &deletes, &proof)?; + self.last_height = next_height; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use floresta_common::assert_err; + use tokio::time::timeout; + + use super::*; + + fn dummy_update() -> StumpUpdate { + StumpUpdate { + adds: Vec::new(), + deletes: Vec::new(), + proof: Proof::default(), + } + } + + async fn assert_worker_closed(done: oneshot::Receiver) { + let done_result = timeout(Duration::from_secs(1), done).await.unwrap(); + assert_err!(done_result); + } + + #[tokio::test] + async fn run_closes_done_if_channel_closes_before_stop_height() { + let StumpUpdaterHandle { tx, done } = StumpUpdater::spawn(Stump::new(), 0, 1); + + drop(tx); + + assert_worker_closed(done).await; + } + + #[tokio::test] + async fn run_closes_done_if_height_is_equal_to_last_height() { + let StumpUpdaterHandle { tx, done } = StumpUpdater::spawn(Stump::new(), 10, 12); + + tx.send((10, dummy_update())).unwrap(); + + assert_worker_closed(done).await; + } + + #[tokio::test] + async fn run_closes_done_if_height_is_lower_than_last_height() { + let StumpUpdaterHandle { tx, done } = StumpUpdater::spawn(Stump::new(), 10, 12); + + tx.send((9, dummy_update())).unwrap(); + + assert_worker_closed(done).await; + } + + #[tokio::test] + async fn run_closes_done_if_height_is_above_stop_height() { + let StumpUpdaterHandle { tx, done } = StumpUpdater::spawn(Stump::new(), 10, 12); + + tx.send((13, dummy_update())).unwrap(); + + assert_worker_closed(done).await; + } + + #[tokio::test] + async fn run_closes_done_on_duplicate_height() { + let StumpUpdaterHandle { tx, done } = StumpUpdater::spawn(Stump::new(), 0, 3); + + tx.send((2, dummy_update())).unwrap(); + tx.send((2, dummy_update())).unwrap(); + + assert_worker_closed(done).await; + } + + #[test] + #[should_panic] + fn spawn_panics_if_initial_height_is_not_below_stop_height() { + for h in 0..5 { + let _ = StumpUpdater::spawn(Stump::new(), h, 5); + } + + let _ = StumpUpdater::spawn(Stump::new(), 5, 5); + } + + #[test] + #[should_panic] + fn spawn_panics_if_initial_height_is_above_stop_height() { + let _ = StumpUpdater::spawn(Stump::new(), 6, 5); + } +} diff --git a/crates/floresta-wire/src/p2p_wire/tests/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/tests/chain_selector.rs index c1d5d74ea..8455f019f 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/chain_selector.rs @@ -11,8 +11,9 @@ mod tests { use rustreexo::stump::Stump; use crate::p2p_wire::tests::utils::PeerData; + use crate::p2p_wire::tests::utils::SetupNodeArgs; use crate::p2p_wire::tests::utils::create_false_acc; - use crate::p2p_wire::tests::utils::setup_node; + use crate::p2p_wire::tests::utils::setup_sync_node; use crate::p2p_wire::tests::utils::signet_blocks; use crate::p2p_wire::tests::utils::signet_headers; use crate::p2p_wire::tests::utils::signet_roots; @@ -44,8 +45,9 @@ mod tests { PeerData::new(headers.clone(), blocks.clone(), true_accs), PeerData::new(headers.clone(), blocks, false_accs), ]; + let args = SetupNodeArgs::new(peers, true, Network::Signet, datadir, NUM_BLOCKS); - let chain = setup_node(peers, true, Network::Signet, &datadir, NUM_BLOCKS).await; + let chain = setup_sync_node(args).await; let best_block = chain.get_best_block().unwrap(); assert_eq!(best_block.1, headers[NUM_BLOCKS].block_hash()); @@ -93,8 +95,9 @@ mod tests { } peers.push(PeerData::new(headers.clone(), blocks, true_accs)); + let args = SetupNodeArgs::new(peers, true, Network::Signet, datadir, NUM_BLOCKS); - let chain = setup_node(peers, true, Network::Signet, &datadir, NUM_BLOCKS).await; + let chain = setup_sync_node(args).await; let best_block = chain.get_best_block().unwrap(); assert_eq!(best_block.1, headers[NUM_BLOCKS].block_hash()); diff --git a/crates/floresta-wire/src/p2p_wire/tests/mod.rs b/crates/floresta-wire/src/p2p_wire/tests/mod.rs index 080bcabbd..64ed91f2c 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/mod.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 mod chain_selector; +mod swift_sync; mod sync_node; mod utils; diff --git a/crates/floresta-wire/src/p2p_wire/tests/swift_sync.rs b/crates/floresta-wire/src/p2p_wire/tests/swift_sync.rs new file mode 100644 index 000000000..880b50943 --- /dev/null +++ b/crates/floresta-wire/src/p2p_wire/tests/swift_sync.rs @@ -0,0 +1,141 @@ +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use bitcoin::Block; + use bitcoin::BlockHash; + use bitcoin::Network; + use bitcoin::consensus::encode::deserialize_hex; + use floresta_chain::pruned_utreexo::BlockchainInterface; + use floresta_common::bhash; + + use crate::p2p_wire::tests::utils::PeerData; + use crate::p2p_wire::tests::utils::SetupNodeArgs; + use crate::p2p_wire::tests::utils::mainnet_headers; + use crate::p2p_wire::tests::utils::mutate_block; + use crate::p2p_wire::tests::utils::setup_swiftsync; + + const NUM_BLOCKS: usize = 175; + + fn read_blocks_txt() -> HashMap { + include_str!("../../../../floresta-chain/testdata/mainnet_blocks.txt") + .lines() + .skip(1) + .map(|b| deserialize_hex(b).unwrap()) + .map(|b: Block| (b.block_hash(), b)) + .collect() + } + + #[tokio::test] + async fn test_swift_sync_valid_blocks() { + let datadir = format!("./tmp-db/{}.swift_sync_node", rand::random::()); + std::fs::create_dir_all(&datadir).unwrap(); + // We need the hints in the datadir + std::fs::copy( + "./src/p2p_wire/tests/test_data/bitcoin.hints", + format!("{datadir}/bitcoin.hints"), + ) + .unwrap(); + + let headers = mainnet_headers(); + let blocks = read_blocks_txt(); + assert_eq!(blocks.len(), NUM_BLOCKS); + + let peer = vec![PeerData::new(Vec::new(), blocks, HashMap::new())]; + let args = SetupNodeArgs::new(peer, false, Network::Bitcoin, datadir, NUM_BLOCKS); + + let chain = setup_swiftsync(args).await; + + assert_eq!(chain.get_validation_index().unwrap(), NUM_BLOCKS as u32); + let best_block = chain.get_best_block().unwrap(); + let expected = ( + 175, + bhash!("00000000fd4afcc15f0fdda9b24be4c62068d8cf82fe6277730fd096712d9d08"), + ); + + assert_eq!(best_block.1, headers[NUM_BLOCKS].block_hash()); + assert_eq!(best_block, expected); + assert!(!chain.is_in_ibd()); + } + + #[tokio::test] + async fn test_swift_sync_mutated_block() { + let datadir = format!("./tmp-db/{}.swift_sync_node", rand::random::()); + std::fs::create_dir_all(&datadir).unwrap(); + // We need the hints in the datadir + std::fs::copy( + "./src/p2p_wire/tests/test_data/bitcoin.hints", + format!("{datadir}/bitcoin.hints"), + ) + .unwrap(); + + let headers = mainnet_headers(); + let mut blocks = read_blocks_txt(); + assert_eq!(blocks.len(), NUM_BLOCKS); + + // Replace the height 151 block with an invalid one + if let Some(block) = blocks.get_mut(&headers[151].block_hash()) { + mutate_block(block); + } + + // We will have 9 peers sending mutated blocks, only one with the original txdata + let mut peers = vec![PeerData::new(Vec::new(), blocks, HashMap::new()); 9]; + peers.push(PeerData::new(Vec::new(), read_blocks_txt(), HashMap::new())); + + let args = SetupNodeArgs::new(peers, false, Network::Bitcoin, datadir, NUM_BLOCKS); + let chain = setup_swiftsync(args).await; + + assert_eq!(chain.get_validation_index().unwrap(), NUM_BLOCKS as u32); + let best_block = chain.get_best_block().unwrap(); + let expected = ( + 175, + bhash!("00000000fd4afcc15f0fdda9b24be4c62068d8cf82fe6277730fd096712d9d08"), + ); + + assert_eq!(best_block.1, headers[NUM_BLOCKS].block_hash()); + assert_eq!(best_block, expected); + assert!(!chain.is_in_ibd()); + } + + // TODO add invalid block test + /* #[tokio::test] + async fn test_swift_sync_invalid_block() { + let datadir = format!("./tmp-db/{}.swift_sync_node", rand::random::()); + std::fs::create_dir_all(&datadir).unwrap(); + // We need the hints in the datadir + std::fs::copy( + "./src/p2p_wire/tests/test_data/bitcoin.hints", + format!("{datadir}/bitcoin.hints"), + ) + .unwrap(); + + let headers = mainnet_headers(); + let mut blocks = read_blocks_txt(); + assert_eq!(blocks.len(), NUM_BLOCKS); + + // Replace the height 151 block with an invalid one + if let Some(block) = blocks.get_mut(&headers[151].block_hash()) { + mutate_block(block); + } + + // The first peer to send the invalid block is banned, then we switch to `SyncNode`, we + // advance the validation index up to block 150, and finally ban the peer sending us the + // invalid block again. + // + // NOTE: we need `MAX_OUTGOING_PEERS` for `ChainSelector` to start and move to `SwiftSync`. + let peers: Vec<_> = (0..SwiftSync::MAX_OUTGOING_PEERS) + .map(|_| PeerData::new(Vec::new(), blocks.clone(), HashMap::new())) + .collect(); + + let args = SetupNodeArgs::new(peers, false, Network::Bitcoin, datadir, NUM_BLOCKS); + + // Running node ensures we switch from `SwiftSync` to `SyncNode`, as we can't verify the + // SwiftSync hints since the chain is invalid. + let chain = setup_running_node(args).await; + + // Block at height 151 was invalidated when connecting it to the chain + assert_eq!(chain.get_validation_index().unwrap(), 150); + assert_eq!(chain.get_best_block().unwrap().1, headers[150].block_hash()); + assert!(!chain.is_in_ibd()); + }*/ +} diff --git a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs index db6951304..dcc445695 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs @@ -8,8 +8,9 @@ mod tests { use floresta_chain::pruned_utreexo::BlockchainInterface; use crate::p2p_wire::tests::utils::PeerData; - use crate::p2p_wire::tests::utils::mutated_block_h7; - use crate::p2p_wire::tests::utils::setup_node; + use crate::p2p_wire::tests::utils::SetupNodeArgs; + use crate::p2p_wire::tests::utils::mutate_block; + use crate::p2p_wire::tests::utils::setup_sync_node; use crate::p2p_wire::tests::utils::signet_blocks; use crate::p2p_wire::tests::utils::signet_headers; @@ -22,7 +23,9 @@ mod tests { let blocks = signet_blocks(); let peer = vec![PeerData::new(Vec::new(), blocks, HashMap::new())]; - let chain = setup_node(peer, false, Network::Signet, &datadir, NUM_BLOCKS).await; + let args = SetupNodeArgs::new(peer, false, Network::Signet, datadir, NUM_BLOCKS); + + let chain = setup_sync_node(args).await; assert_eq!(chain.get_validation_index().unwrap(), 9); assert_eq!(chain.get_best_block().unwrap().1, headers[9].block_hash()); @@ -35,14 +38,17 @@ mod tests { let headers = signet_headers(); let mut blocks = signet_blocks(); - // Replace the height 7 block with a mutated block - blocks.insert(headers[7].block_hash(), mutated_block_h7()); + // Replace the height 7 block with an invalid one + if let Some(block) = blocks.get_mut(&headers[7].block_hash()) { + mutate_block(block); + } // We will have 9 peers sending mutated blocks, only one with the original txdata let mut peers = vec![PeerData::new(Vec::new(), blocks, HashMap::new()); 9]; peers.push(PeerData::new(Vec::new(), signet_blocks(), HashMap::new())); - let chain = setup_node(peers, false, Network::Signet, &datadir, NUM_BLOCKS).await; + let args = SetupNodeArgs::new(peers, false, Network::Signet, datadir, NUM_BLOCKS); + let chain = setup_sync_node(args).await; // We were able to find the original block and sync assert_eq!(chain.get_validation_index().unwrap(), 9); diff --git a/crates/floresta-wire/src/p2p_wire/tests/test_data/bitcoin.hints b/crates/floresta-wire/src/p2p_wire/tests/test_data/bitcoin.hints new file mode 100644 index 000000000..581baa52e Binary files /dev/null and b/crates/floresta-wire/src/p2p_wire/tests/test_data/bitcoin.hints differ diff --git a/crates/floresta-wire/src/p2p_wire/tests/utils.rs b/crates/floresta-wire/src/p2p_wire/tests/utils.rs index 528730c8b..0c8a239bd 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/utils.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/utils.rs @@ -29,6 +29,7 @@ use floresta_mempool::Mempool; use rand::RngCore; use rand::TryRngCore; use rand::rngs::OsRng; +use rand::seq::IndexedMutRandom; use serde::Deserialize; use serde::Serialize; use tokio::sync::Mutex; @@ -49,7 +50,10 @@ use crate::node::NodeNotification; use crate::node::NodeRequest; use crate::node::PeerStatus; use crate::node::UtreexoNode; +use crate::node::running_ctx::RunningNode; +use crate::node::swift_sync_ctx::SwiftSync; use crate::node::sync_ctx::SyncNode; +use crate::node_context::NodeContext; use crate::p2p_wire::block_proof::UtreexoProof; use crate::p2p_wire::peer::PeerMessages; use crate::p2p_wire::peer::Version; @@ -121,7 +125,7 @@ impl SimulatedPeer { .send(NodeNotification::FromPeer(self.peer_id, peer_msg, now)) .unwrap(); } - NodeRequest::GetBlock(hashes) => { + NodeRequest::GetBlock(hashes, _) => { for hash in hashes { let block = self.blocks.get(&hash).unwrap().clone(); @@ -161,15 +165,18 @@ impl SimulatedPeer { } } -pub fn create_peer( - headers: Vec
, - blocks: HashMap, - accs: HashMap>, +pub fn spawn_peer( + peer_data: PeerData, node_sender: UnboundedSender, - sender: UnboundedSender, - node_rcv: UnboundedReceiver, peer_id: u32, ) -> LocalPeerView { + let (sender, node_rcv) = unbounded_channel(); + let PeerData { + headers, + blocks, + accs, + } = peer_data; + let mut peer = SimulatedPeer::new(headers, blocks, accs, node_sender, node_rcv, peer_id); task::spawn(async move { peer.run().await; @@ -247,6 +254,20 @@ pub fn signet_headers() -> Vec
{ headers } +pub fn mainnet_headers() -> Vec
{ + let mut headers: Vec
= Vec::new(); + + let file = include_bytes!("../../../../floresta-chain/testdata/headers.zst"); + let uncompressed: Vec = zstd::decode_all(std::io::Cursor::new(file)).unwrap(); + let mut buffer = uncompressed.as_slice(); + + while let Ok(header) = Header::consensus_decode(&mut buffer) { + headers.push(header); + } + + headers +} + /// Returns the first 121 signet blocks, including genesis pub fn signet_blocks() -> HashMap { let file = include_str!("./test_data/blocks.json"); @@ -281,11 +302,16 @@ pub fn signet_roots() -> HashMap> { accs } -/// Returns a mutated signet block at height 7 -pub fn mutated_block_h7() -> Block { - deserialize_hex( - "00000020daf3b60d374b19476461f97540498dcfa2eb7016238ec6b1d022f82fb60100007a7ae65b53cb988c2ec92d2384996713821d5645ffe61c9acea60da75cd5edfa1a944d5fae77031e9dbb050001010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025751feffffff0200f2052a01000000160014ef2dceae02e35f8137de76768ae3345d99ca68860000000000000000776a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf94c4fecc7daa2490047304402202b3f946d6447f9bf17d00f3696cede7ee70b785495e5498274ee682a493befd5022045fc0bcf9331073168b5d35507175f9f374a8eba2336873885d12aada67ea5f601000120000000000000000000000000000000000000000000000000000000000000000000000000" - ).unwrap() +/// Modifies a block to have an invalid output script (txdata is tampered with) +pub fn mutate_block(block: &mut Block) { + let mut rng = rand::rng(); + + let tx = block.txdata.choose_mut(&mut rng).unwrap(); + let out = tx.output.choose_mut(&mut rng).unwrap(); + let spk = out.script_pubkey.as_mut_bytes(); + let byte = spk.choose_mut(&mut rng).unwrap(); + + *byte += 1; } #[derive(Clone, Constructor)] @@ -296,52 +322,48 @@ pub struct PeerData { accs: HashMap>, } -pub async fn setup_node( +#[derive(Constructor)] +/// The arguments needed to set up the test `UtreexoNode` +pub struct SetupNodeArgs { peers: Vec, pow_fraud_proofs: bool, network: Network, - datadir: impl AsRef, + datadir: String, num_blocks: usize, -) -> Arc> { - let config = FlatChainStoreConfig::new(&datadir); +} - let chainstore = FlatChainStore::new(config).unwrap(); - let mempool = Arc::new(Mutex::new(Mempool::new(1000))); - let chain = ChainState::open(chainstore, network, AssumeValidArg::Disabled).unwrap(); - let chain = Arc::new(chain); +type Chain = Arc>; + +pub fn setup_node(args: SetupNodeArgs) -> UtreexoNode +where + T: 'static + Default + NodeContext, +{ + let net = args.network; + let datadir = args.datadir; - let mut headers = signet_headers(); - headers.remove(0); - headers.truncate(num_blocks); - for header in headers { + // Create `ChainState` and add headers to it + let chainstore = FlatChainStore::new(FlatChainStoreConfig::new(datadir.clone())).unwrap(); + let chain = Arc::new(ChainState::open(chainstore, net, AssumeValidArg::Disabled).unwrap()); + + let headers = match net { + Network::Signet => signet_headers(), + Network::Bitcoin => mainnet_headers(), + _ => panic!("unavailable headers for net: {net}"), + }; + for header in headers.into_iter().skip(1).take(args.num_blocks) { chain.accept_header(header).unwrap(); } - let config = get_node_config(&datadir, network, pow_fraud_proofs); + // Create `UtreexoNode` and spawn the simulated peers + let config = get_node_config(datadir, net, args.pow_fraud_proofs); + let mempool = Arc::new(Mutex::new(Mempool::new(1000))); let kill_signal = Arc::new(RwLock::new(false)); - let mut node = UtreexoNode::>, SyncNode>::new( - config, - chain.clone(), - mempool, - None, - kill_signal.clone(), - AddressMan::new(None, &[]), - ) - .unwrap(); - - for (i, peer) in peers.into_iter().enumerate() { - let (sender, receiver) = unbounded_channel(); - let peer_id = i as u32; + let addr_man = AddressMan::new(None, &[]); + let mut node = UtreexoNode::new(config, chain, mempool, None, kill_signal, addr_man).unwrap(); - let peer = create_peer( - peer.headers, - peer.blocks, - peer.accs, - node.node_tx.clone(), - sender.clone(), - receiver, - peer_id, - ); + for (i, peer_data) in args.peers.into_iter().enumerate() { + let peer_id = i as u32; + let peer = spawn_peer(peer_data, node.node_tx.clone(), peer_id); // Add a fixed peer to avoid opening real P2P connections if i == 0 { @@ -368,9 +390,46 @@ pub async fn setup_node( ); } - timeout(Duration::from_secs(100), node.run(|_| {})) - .await - .unwrap(); + node +} + +const NODE_TIMEOUT: Duration = Duration::from_secs(100); + +pub async fn setup_sync_node(args: SetupNodeArgs) -> Arc> { + let node = setup_node::(args); + let chain = node.chain.clone(); + + timeout(NODE_TIMEOUT, node.run(|_| {})).await.unwrap(); + + chain +} + +pub async fn setup_swiftsync(args: SetupNodeArgs) -> Arc> { + let node = setup_node::(args); + let chain = node.chain.clone(); + + timeout(NODE_TIMEOUT, node.run(|_| {})).await.unwrap(); + + chain +} + +#[allow(unused)] +pub async fn setup_running_node(args: SetupNodeArgs) -> Arc> { + let node = setup_node::(args); + let kill_signal = node.kill_signal.clone(); + let chain = node.chain.clone(); + + // Sends a kill signal to the `RunningNode` after 20 seconds + let killer = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(20)).await; + *kill_signal.write().await = true; + }); + + let (sender, receiver) = tokio::sync::oneshot::channel::<()>(); + timeout(NODE_TIMEOUT, node.run(sender)).await.unwrap(); + + receiver.await.unwrap(); + killer.abort(); chain } @@ -385,15 +444,54 @@ fn to_addr_v2(addr: IpAddr) -> AddrV2 { #[cfg(test)] mod tests { + use std::fs::File; + use bitcoin::BlockHash; use bitcoin::consensus::deserialize; use bitcoin::hashes::Hash; + use floresta_common::bhash; + use hintsfile::Hintsfile; - use super::mutated_block_h7; + use super::mutate_block; use super::signet_blocks; use super::signet_headers; use super::signet_roots; + fn load_test_hints() -> Hintsfile { + let mut file = File::open("./src/p2p_wire/tests/test_data/bitcoin.hints").unwrap(); + Hintsfile::from_reader(&mut file).unwrap() + } + + #[test] + #[should_panic] + fn test_hints_file_genesis() { + let hints = load_test_hints(); + hints.indices_at_height(0).unwrap(); + } + + #[test] + #[should_panic] + fn test_hints_file_after_stop_height() { + let hints = load_test_hints(); + hints.indices_at_height(176).unwrap(); + } + + #[test] + fn test_hints_file_shape() { + let hints = load_test_hints(); + assert_eq!(hints.stop_height(), 175); + + for height in 1..=175 { + let unspent_indices = match height { + 9 => Vec::new(), // The single UTXO in this block is spent later + 170 => vec![0, 1, 2], // Contains the transaction spending the height-9 UTXO + _ => vec![0], // Other blocks have just a coinbase output (here unspent) + }; + + assert_eq!(hints.indices_at_height(height).unwrap(), unspent_indices); + } + } + #[test] fn test_get_headers_and_blocks() { let headers = signet_headers(); @@ -423,16 +521,23 @@ mod tests { #[test] fn test_get_mutated_block() { - let mutated_block = mutated_block_h7(); - assert!(!mutated_block.txdata.is_empty(), "at least one tx"); + let hash = bhash!("000002c45c8ea9e553d4b0ee5d50324e56fc76f13019873fe707ff44fc56183f"); + let blocks = signet_blocks(); + + let mut block_25 = blocks.get(&hash).unwrap().clone(); + mutate_block(&mut block_25); - assert!(!mutated_block.check_merkle_root(), "invalid merkle root"); + assert!(!block_25.txdata.is_empty(), "at least one tx"); + assert!( + !block_25.check_merkle_root(), + "invalid merkle root (txdata was tampered with)", + ); let headers = signet_headers(); assert_eq!( - mutated_block.header.prev_blockhash, - headers[6].block_hash(), - "invalid block is at height 7", + block_25.header.prev_blockhash, + headers[24].block_hash(), + "block is at height 25", ); }