From 9f56f17f21c91991a6550878110be3a5e89f94e2 Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Mon, 29 Jan 2024 13:01:25 -0300 Subject: [PATCH] add chain_selector mod --- .../src/pruned_utreexo/chain_state.rs | 56 +- .../src/pruned_utreexo/chainparams.rs | 45 + .../floresta-chain/src/pruned_utreexo/mod.rs | 10 +- .../src/pruned_utreexo/partial_chain.rs | 6 +- crates/floresta-wire/src/lib.rs | 2 + .../src/p2p_wire/chain_selector.rs | 265 +++++ crates/floresta-wire/src/p2p_wire/mod.rs | 2 + crates/floresta-wire/src/p2p_wire/node.rs | 1043 ++--------------- .../src/p2p_wire/node_context.rs | 51 +- crates/floresta-wire/src/p2p_wire/peer.rs | 4 +- .../src/p2p_wire/running_node.rs | 639 ++++++++++ crates/floresta/examples/node.rs | 2 +- florestad/src/main.rs | 5 +- 13 files changed, 1125 insertions(+), 1005 deletions(-) create mode 100644 crates/floresta-wire/src/p2p_wire/chain_selector.rs create mode 100644 crates/floresta-wire/src/p2p_wire/running_node.rs diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index bc4e7f5f..fe56cda4 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -762,6 +762,11 @@ impl ChainState { })?; Ok(()) } + + fn get_assumeutreexo_index(&self) -> (BlockHash, u32) { + let guard = read_lock!(self); + guard.consensus.parameters.assumeutreexo_index + } } impl BlockchainInterface for ChainState { @@ -886,6 +891,48 @@ impl BlockchainInterface for ChainState UpdatableChainstate for ChainState { + fn mark_chain_as_valid(&self, _height: u32, hash: BlockHash) -> Result { + let (assume_utreexo_hash, _) = self.get_assumeutreexo_index(); + + let mut assumed_hash = hash; + + // Walks the chain until finding our assumeutxo block. + // Since this block was passed in before starting florestad, this value should be + // lesser than or equal our current tip. If we don't find that block, it means the + // assumeutxo block was reorged out (or never was in the main chain). That's weird, but we + // should take precoution against it + while let Ok(header) = self.get_block_header(&assumed_hash) { + if header.block_hash() == assume_utreexo_hash { + break; + } + // We've reached genesis and didn't our block + if self.is_genesis(&header) { + break; + } + assumed_hash = self.get_ancestor(&header)?.block_hash(); + } + + // The assumeutreexo value passed is **not** in the main chain, start validaton from geneis + if assumed_hash != assume_utreexo_hash { + warn!("We are in a diffenrent chain than our defualt or provided assumeutreexo value. Restarting from genesis"); + + let mut guard = write_lock!(self); + + guard.best_block.validation_index = assumed_hash; // Should be equal to genesis + guard.acc = Stump::new(); + + return Ok(false); + } + + // The assumeutreexo value passed is inside our main chain, start from that point + let mut guard = write_lock!(self); + let acc = guard.consensus.parameters.network_roots.clone(); + guard.best_block.validation_index = assumed_hash; + guard.acc = acc; + + Ok(true) + } + fn invalidate_block(&self, block: BlockHash) -> Result<(), BlockchainError> { let height = self.get_disk_block_header(&block)?.height(); if height.is_none() { @@ -997,12 +1044,12 @@ impl UpdatableChainstate for ChainState Result<(), BlockchainError> { + fn accept_header(&self, header: BlockHeader) -> Result { trace!("Accepting header {header:?}"); let _header = self.get_disk_block_header(&header.block_hash()); if _header.is_ok() { self.maybe_reindex(&_header?); - return Ok(()); // We already have this header + return Ok(true); // We already have this header } // The best block we know of let best_block = self.get_best_block()?; @@ -1027,11 +1074,14 @@ impl UpdatableChainstate for ChainState Vec { let inner = read_lock!(self); diff --git a/crates/floresta-chain/src/pruned_utreexo/chainparams.rs b/crates/floresta-chain/src/pruned_utreexo/chainparams.rs index 78b2f21d..6cf9454d 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chainparams.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chainparams.rs @@ -12,6 +12,8 @@ use bitcoin::blockdata::constants::genesis_block; use bitcoin::Block; use bitcoin::BlockHash; use bitcoin::Target; +use rustreexo::accumulator::node_hash::NodeHash; +use rustreexo::accumulator::stump::Stump; use crate::prelude::*; use crate::Network; @@ -49,6 +51,8 @@ pub struct ChainParams { /// A list of exceptions to the rules, where the key is the block hash and the value is the /// verification flags pub exceptions: HashMap, + pub network_roots: Stump, + pub assumeutreexo_index: (BlockHash, u32), } impl ChainParams { @@ -62,6 +66,32 @@ impl ChainParams { } } +fn get_signet_roots() -> Stump { + let roots: Vec = [ + "8e6fcdcf05020fa1f7131a59a7050b33ca74852f5e82a5fbe236402bc4c8a928", + "f4c92949c71be7613699977eebf6d3bd5c8fd3e538a01380583e5aba14273425", + "d73ceb2748d342b14a269d7c0feb34aca1341a6367cc75cff6db8422eb01916d", + "a012e516784ccb7af26d7b356bf645e6a167cce5b48b9368c58c523acd25f6bf", + "e6e74ebc1d01ac47541c90afaac208c9b0f16226d2d046742032374e925a79ae", + "235b255558e994e6c5b6011469e891436cbf18107a939847e6e5df4cb939a96b", + "a9f45482564f0cb103067636c39fe30df1fa04b6b04d438c655530d991432761", + "d46716b7ccaf8d9eff11557527056f6100e016126df369eef95b9c9874467d40", + "7039b9053ef819d35c079eb4dcdd37029653a325bf416768e7de16bacf2c90af", + "f7a626339303030fc1b71d228e74aebdc2126cb7a2c5e01eb036225ea9dd41c2", + "b21123705cb4cef5a104705037ccd80ae7281789aa07cd468d5949c7e62df37b", + "ca931559f3ad9c91b9510f5dbfa42467e40ad8a0069d8f273de6079e9b115232", + "954ca698b58b6e6cdcc89948c841059d892578b7d67a249965fff83de5aaa7e3", + ] + .iter() + .map(|hash| NodeHash::from_str(hash).unwrap()) + .collect(); + + Stump { + roots, + leaves: 1477499, + } +} + #[cfg(feature = "bitcoinconsensus")] fn get_exceptions() -> HashMap { // For some reason, some blocks in the mainnet and testnet have different rules than it should @@ -89,6 +119,7 @@ fn get_exceptions() -> HashMap { fn get_exceptions() -> HashMap { HashMap::new() } + impl From for ChainParams { fn from(net: Network) -> Self { let genesis = genesis_block(net.into()); @@ -96,6 +127,7 @@ impl From for ChainParams { let exceptions = get_exceptions(); match net { Network::Bitcoin => ChainParams { + assumeutreexo_index: (genesis.block_hash(), 0), genesis, max_target, pow_allow_min_diff: false, @@ -110,8 +142,10 @@ impl From for ChainParams { segwit_activation_height: 481824, csv_activation_height: 419328, exceptions, + network_roots: Stump::default(), }, Network::Testnet => ChainParams { + assumeutreexo_index: (genesis.block_hash(), 0), genesis, max_target, pow_allow_min_diff: true, @@ -126,6 +160,7 @@ impl From for ChainParams { segwit_activation_height: 834_624, csv_activation_height: 770_112, exceptions, + network_roots: Stump::default(), }, Network::Signet => ChainParams { genesis, @@ -142,8 +177,17 @@ impl From for ChainParams { bip66_activation_height: 1, segwit_activation_height: 1, exceptions, + network_roots: get_signet_roots(), + assumeutreexo_index: ( + BlockHash::from_str( + "0000001321625245a27e0be82a640106d019e35e48a024a17df1ceeb9b1f2131", + ) + .unwrap(), + 74551, + ), }, Network::Regtest => ChainParams { + assumeutreexo_index: (genesis.block_hash(), 0), genesis, max_target, pow_allow_min_diff: false, @@ -158,6 +202,7 @@ impl From for ChainParams { bip66_activation_height: 0, segwit_activation_height: 0, exceptions, + network_roots: Stump::default(), }, } } diff --git a/crates/floresta-chain/src/pruned_utreexo/mod.rs b/crates/floresta-chain/src/pruned_utreexo/mod.rs index fce07982..2a6b0793 100644 --- a/crates/floresta-chain/src/pruned_utreexo/mod.rs +++ b/crates/floresta-chain/src/pruned_utreexo/mod.rs @@ -90,7 +90,9 @@ pub trait UpdatableChainstate { /// Accepts a new header to our chain. This method is called before connect_block, and /// makes some basic checks on a header and saves it on disk. We only accept a block as /// valid after calling connect_block. - fn accept_header(&self, header: BlockHeader) -> Result<(), BlockchainError>; + /// + /// This function returns whether this block is on our best-known chain, or in a fork + fn accept_header(&self, header: BlockHeader) -> Result; /// Not used for now, but in a future blockchain with mempool, we can process transactions /// that are not in a block yet. fn handle_transaction(&self) -> Result<(), BlockchainError>; @@ -121,6 +123,12 @@ pub trait UpdatableChainstate { final_height: u32, acc: Stump, ) -> Result; + + /// Marks a chain as fully-valid + /// + /// This mimics the behavour of checking every block before this block, and continues + /// from this point + fn mark_chain_as_valid(&self, height: u32, hash: BlockHash) -> Result; } /// [ChainStore] is a trait defining how we interact with our chain database. This definitions diff --git a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs index c1c2865b..3ae754ec 100644 --- a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs +++ b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs @@ -343,7 +343,7 @@ impl UpdatableChainstate for PartialChainState { // these are unimplemented, and will panic if called - fn accept_header(&self, _header: BlockHeader) -> Result<(), BlockchainError> { + fn accept_header(&self, _header: BlockHeader) -> Result { unimplemented!("partialChainState shouldn't be used to accept new headers") } @@ -367,6 +367,10 @@ impl UpdatableChainstate for PartialChainState { fn process_rescan_block(&self, _block: &bitcoin::Block) -> Result<(), BlockchainError> { unimplemented!("we don't do rescan") } + + fn mark_chain_as_valid(&self, _height: u32, _hash: BlockHash) -> Result { + unimplemented!("no need to mark as valid") + } } impl BlockchainInterface for PartialChainState { diff --git a/crates/floresta-wire/src/lib.rs b/crates/floresta-wire/src/lib.rs index 979aac3c..cbfbf155 100644 --- a/crates/floresta-wire/src/lib.rs +++ b/crates/floresta-wire/src/lib.rs @@ -24,6 +24,8 @@ pub use p2p_wire::node; pub use p2p_wire::node_context; #[cfg(not(target_arch = "wasm32"))] pub use p2p_wire::node_interface; +#[cfg(not(target_arch = "wasm32"))] +pub use p2p_wire::running_node; /// NodeHooks is a trait that defines the hooks that a node can use to interact with the network /// and the blockchain. Every time an event happens, the node will call the corresponding hook. diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs new file mode 100644 index 00000000..60561a3f --- /dev/null +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -0,0 +1,265 @@ +// SPDX-License-Identifier: MIT + +//! A module that connects with multiple peers and finds the best chain. +//! +//! # The theory +//! +//! In Bitcoin, the history of transactions processes by the network is defined by a sequence of +//! blocks, chainned by their cryptographic hash. A block commits the hash for the block right +//! before it. Therefore, if we pick any given block, there's exactly one history leading to the +//! very first block, that commits to no one. However, if you go in the other way, starting at the +//! first block and going up, there may not be only one history. Multiple blocks may commit to the +//! same parent. We need a way to pick just one such chain, among all others. +//! +//! To do that, we use the most work rule, sometimes called "Nakamoto Consensus" after Bitcoin's +//! creator, Satoshi Nakamoto. Every block has to solve a probabilistic challenge of finding a +//! combination of data that hashes to a value smaller than a network-agreed value. Because hash +//! functions are pseudorandom, one must make a amount of hashes (on average) before finding a +//! valid one. If we define the amount of hashes needed to find a block as this block's "work", +//! by adding-up the work in each of a chain's blocks, we arrive with the `chainwork`. The Nakamoto +//! consensus consists in taking the chain with most work as the best one. +//! +//! This works because anyone in the network will compute the same amount of work and pick the same +//! one, regardless of where and when. Because work is a intrinsic and deterministic property of a +//! block, everyone comparing the same chain, be on earth, on mars; in 2020 or 2100, they will +//! choose the exact same chain, aways. +//! +//! The most critial part of syncing-up a Bitcoin node is making sure you know about the most-work +//! chain. If someone can eclypse you, they can make you start following a chain that only you and +//! the attacker care about. If you get pay in this chain, you can't pay someone else outside this +//! chain, because they will be following other chains. Luckly, we only need one honest peer, to +//! find the best-work chain and avoid any attacker to fools us into accepting payments in a "fake +//! Bitcoin" +//! +//! # Implementation +//! +//! This module will connect with multiple peers, ask what is the best chain they know about and +//! try to find a consensus among our peers. We'll use the chain we've just discovered to download +//! the blocks and perform full validation of them. To enumerte all chains, we send a `getblocks` +//! request. This message tells our peers to send the hash of the blocks it knows about. +//! Using this information we find the chains our peers seems to be following, and them download +//! and validate headers for this chains. We need to validate those headers, specially their PoW, +//! to avoid peers forcing us to download an verify an invalid proof, opening a resource exaustion +//! DoS oportunity. + +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use async_std::future::timeout; +use async_std::sync::RwLock; +use bitcoin::block::Header; +use bitcoin::p2p::ServiceFlags; +use floresta_chain::pruned_utreexo::BlockchainInterface; +use floresta_chain::pruned_utreexo::UpdatableChainstate; +use log::info; + +use super::error::WireError; +use super::peer::PeerMessages; +use crate::address_man::AddressState; +use crate::node::periodic_job; +use crate::node::try_and_log; +use crate::node::InflightRequests; +use crate::node::NodeNotification; +use crate::node::NodeRequest; +use crate::node::UtreexoNode; +use crate::node_context::NodeContext; +use crate::node_context::PeerId; + +#[derive(Debug, Default, Clone)] +/// A p2p driver that attemps to connect with multiple peers, ask which chain are them following +/// and download and verify the headers, **not** the actual blocks. This is the first part of a +/// loger IBD pipeline. +/// The actual blocks should be downloaded by a SyncPeer. +pub struct ChainSelector { + state: ChainSelectorState, +} + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub enum ChainSelectorState { + #[default] + /// We are opening connection with some peers + CreatingConnections, + /// We are downloading headers to find the most-work one + DownloadingHeaders, + /// We've downloaded all headers + Done, +} + +impl NodeContext for ChainSelector { + const REQUEST_TIMEOUT: u64 = 10; // Ban peers stalling our IBD + const TRY_NEW_CONNECTION: u64 = 10; // Try creating connections more aggressively +} + +impl UtreexoNode +where + WireError: From<::Error>, + Chain: BlockchainInterface + UpdatableChainstate + 'static, +{ + async fn handle_headers( + &mut self, + peer: PeerId, + headers: Vec
, + ) -> Result<(), WireError> { + if headers.is_empty() { + self.1.state = ChainSelectorState::Done; + return Ok(()); + } + + info!( + "Downloading headers from peer={peer} at height={} hash={}", + self.chain.get_best_block()?.0 + 1, + headers[0].block_hash() + ); + + for header in headers { + match self.chain.accept_header(header) { + Err(e) => { + log::error!("Error while downloading headers from peer={peer} err={e}"); + self.send_to_peer(peer, NodeRequest::Shutdown).await?; + let peer = self.peers.get(&peer).unwrap(); + self.0.address_man.update_set_state( + peer.address_id as usize, + AddressState::Banned(ChainSelector::BAN_TIME), + ); + } + Ok(ff) => { + if !ff { + todo!("This peer is in a fork, handle this") + } + } + } + } + self.request_headers().await + } + + async fn request_headers(&mut self) -> Result<(), WireError> { + let locator = self.chain.get_block_locator().unwrap_or_default(); + let peer = self + .send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) + .await?; + + self.inflight + .insert(InflightRequests::Headers, (peer, Instant::now())); + Ok(()) + } + + async fn check_for_timeout(&mut self) -> Result<(), WireError> { + let mut failed = vec![]; + + for (request, (peer, instant)) in self.inflight.clone() { + if instant.elapsed().as_secs() > ChainSelector::REQUEST_TIMEOUT { + self.send_to_peer(peer, NodeRequest::Shutdown).await?; + failed.push(request) + } + } + + for request in failed { + self.inflight.remove(&request); + } + + Ok(()) + } + + pub async fn check_tips(&mut self) -> Result<(), WireError> { + let locator = self.chain.get_block_locator().unwrap_or_default(); + let mut pendding = Vec::with_capacity(self.peer_ids.len()); + + for peer in self.peer_ids.iter() { + self.send_to_peer(*peer, NodeRequest::GetHeaders(locator.clone())) + .await?; + pendding.push(peer); + } + // TODO: Check if peer has something + Ok(()) + } + + pub async fn run(&mut self, stop_signal: &Arc>) -> Result<(), WireError> { + // Open our first connection to a random peer, we'll use this peer to find a + // candidate chain. + let (height, _) = self.chain.get_best_block()?; + let first_run = height == 0; + + self.create_connection(false).await; + info!("Starting ibd, selecting the best chain"); + loop { + while let Ok(notification) = + timeout(Duration::from_millis(10), self.node_rx.recv()).await + { + try_and_log!(self.handle_notification(notification).await); + } + + periodic_job!( + self.maybe_open_connection().await, + self.last_connection, + TRY_NEW_CONNECTION, + ChainSelector + ); + + if self.1.state == ChainSelectorState::CreatingConnections { + // If we have enough peers, try to download headers + if self.peer_ids.len() >= 2 { + try_and_log!(self.request_headers().await); + self.1.state = ChainSelectorState::DownloadingHeaders; + } + } + + // We downloaded all headers in the most-pow chain. We're done! + if self.1.state == ChainSelectorState::Done { + // Do a last check with our peers to find potential tips + self.check_tips().await?; + + if first_run { + let best_block = self.chain.get_best_block()?; + // To speed things up, we assume that a given height is valid and + // start our chain from there. + self.chain.mark_chain_as_valid(best_block.0, best_block.1)?; + + info!( + "best chain selected at block={} depth={}", + best_block.1, best_block.0 + ); + } + break; + } + + try_and_log!(self.check_for_timeout().await); + + if *stop_signal.read().await { + break; + } + } + Ok(()) + } + + async fn handle_notification( + &mut self, + notification: Result, + ) -> Result<(), WireError> { + match notification? { + NodeNotification::FromPeer(peer, message) => match message { + PeerMessages::Headers(headers) => { + self.inflight.remove(&InflightRequests::Headers); + return self.handle_headers(peer, headers).await; + } + + PeerMessages::Ready(version) => { + self.handle_peer_ready(peer, &version).await?; + } + + PeerMessages::Disconnected(idx) => { + self.handle_disconnection(peer, idx)?; + } + + PeerMessages::Addr(addresses) => { + let addresses: Vec<_> = + addresses.iter().cloned().map(|addr| addr.into()).collect(); + self.address_man.push_addresses(&addresses); + } + + _ => {} + }, + } + Ok(()) + } +} diff --git a/crates/floresta-wire/src/p2p_wire/mod.rs b/crates/floresta-wire/src/p2p_wire/mod.rs index cbcb71d5..544b0e65 100644 --- a/crates/floresta-wire/src/p2p_wire/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/mod.rs @@ -1,11 +1,13 @@ //! Main module for the p2p chain. This is a blockchain provider, just like cli-chain, but it's //! backed by p2p Bitcoin's p2p network. pub mod address_man; +pub mod chain_selector; pub mod error; pub mod mempool; pub mod node; pub mod node_context; pub mod node_interface; pub mod peer; +pub mod running_node; pub mod socks; pub mod stream_reader; diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index e1e50a95..3458a6b7 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -16,65 +16,42 @@ use std::time::UNIX_EPOCH; use async_std::channel::bounded; use async_std::channel::Receiver; -use async_std::channel::SendError; use async_std::channel::Sender; use async_std::channel::{self}; use async_std::future::timeout; use async_std::net::TcpStream; use async_std::sync::RwLock; use async_std::task::spawn; -use bitcoin::block::Header as BlockHeader; -use bitcoin::hashes::sha256; -use bitcoin::hashes::Hash; use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::address::AddrV2Message; -use bitcoin::p2p::message_blockdata::Inventory; -use bitcoin::p2p::utreexo::UData; -use bitcoin::p2p::utreexo::UtreexoBlock; use bitcoin::p2p::ServiceFlags; use bitcoin::BlockHash; -use bitcoin::OutPoint; -use bitcoin::Transaction; -use bitcoin::TxOut; use bitcoin::Txid; use floresta_chain::pruned_utreexo::chainparams::get_chain_dns_seeds; -use floresta_chain::pruned_utreexo::udata::proof_util; use floresta_chain::pruned_utreexo::BlockchainInterface; use floresta_chain::pruned_utreexo::UpdatableChainstate; -use floresta_chain::BlockValidationErrors; -use floresta_chain::BlockchainError; use floresta_chain::Network; use futures::Future; -use log::debug; -use log::error; use log::info; -use log::trace; use log::warn; -use rustreexo::accumulator::node_hash::NodeHash; -use rustreexo::accumulator::proof::Proof; use super::address_man::AddressMan; use super::address_man::AddressState; use super::address_man::LocalAddress; use super::error::WireError; use super::mempool::Mempool; -use super::node_context::IBDNode; use super::node_context::NodeContext; -use super::node_context::RunningNode; use super::node_interface::NodeInterface; -use super::node_interface::NodeResponse; use super::node_interface::PeerInfo; use super::node_interface::UserRequest; use super::peer::Peer; use super::peer::PeerMessages; use super::peer::Version; +use super::running_node::RunningNode; use super::socks::Socks5Addr; use super::socks::Socks5Error; use super::socks::Socks5StreamBuilder; -/// Max number of simultaneous connections we initiates we are willing to hold -const MAX_OUTGOING_PEERS: usize = 10; - #[derive(Debug)] pub enum NodeNotification { FromPeer(u32, PeerMessages), @@ -99,26 +76,19 @@ pub enum NodeRequest { SendAddresses(Vec), } -#[derive(Default, PartialEq)] -enum NodeState { - #[default] - WaitingPeer, - DownloadHeaders, - DownloadBlocks, - Running, -} #[derive(Debug, Hash, PartialEq, Eq, Clone)] -enum InflightRequests { +pub(crate) enum InflightRequests { Headers, Blocks(BlockHash), RescanBlock(BlockHash), UserRequest(UserRequest), Connect(u32), } + #[derive(Debug, Clone)] pub struct LocalPeerView { state: PeerStatus, - address_id: u32, + pub(crate) address_id: u32, channel: Sender, services: ServiceFlags, user_agent: String, @@ -152,52 +122,55 @@ impl Default for RunningNode { } pub struct NodeCommon { - peer_id_count: u32, - last_headers_request: Instant, - last_tip_update: Instant, - last_connection: Instant, - last_peer_db_dump: Instant, - last_broadcast: Instant, - last_send_addresses: Instant, - last_block_request: u32, - network: Network, - last_get_address_request: Instant, - utreexo_peers: Vec, - peer_ids: Vec, - peers: HashMap, - chain: Arc, - inflight: HashMap, - node_rx: Receiver, - node_tx: Sender, - state: NodeState, - mempool: Arc>, - datadir: String, - address_man: AddressMan, - max_banscore: u32, - socks5: Option, - fixed_peer: Option, + pub(crate) peer_id_count: u32, + pub(crate) last_headers_request: Instant, + pub(crate) last_tip_update: Instant, + pub(crate) last_connection: Instant, + pub(crate) last_peer_db_dump: Instant, + pub(crate) last_broadcast: Instant, + pub(crate) last_send_addresses: Instant, + pub(crate) last_block_request: u32, + pub(crate) network: Network, + pub(crate) last_get_address_request: Instant, + pub(crate) utreexo_peers: Vec, + pub(crate) peer_ids: Vec, + pub(crate) peers: HashMap, + pub(crate) chain: Arc, + pub(crate) inflight: HashMap, + pub(crate) node_rx: Receiver, + pub(crate) node_tx: Sender, + pub(crate) mempool: Arc>, + pub(crate) datadir: String, + pub(crate) address_man: AddressMan, + pub(crate) max_banscore: u32, + pub(crate) socks5: Option, + pub(crate) fixed_peer: Option, } pub struct UtreexoNode( - NodeCommon, - Context, + pub(crate) NodeCommon, + pub(crate) Context, ); + impl Deref for UtreexoNode { fn deref(&self) -> &Self::Target { &self.0 } type Target = NodeCommon; } + impl DerefMut for UtreexoNode { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } + #[derive(Debug, PartialEq, Clone, Copy)] enum PeerStatus { Awaiting, Ready, } + impl UtreexoNode where T: 'static + Default + NodeContext, @@ -218,7 +191,6 @@ where UtreexoNode( NodeCommon { inflight: HashMap::new(), - state: NodeState::WaitingPeer, peer_id_count: 0, peers: HashMap::new(), last_block_request: chain.get_validation_index().expect("Invalid chain"), @@ -246,7 +218,7 @@ where ) } - fn get_peer_info(&self, peer: &u32) -> Option { + pub(crate) fn get_peer_info(&self, peer: &u32) -> Option { let peer = self.peers.get(peer)?; Some(PeerInfo { address: format!("{}:{}", peer.address, peer.port), @@ -255,7 +227,7 @@ where initial_height: peer.height, }) } - fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { + pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { if let Some(p) = self.peers.remove(&peer) { p.channel.close(); if !p.feeler && p.state == PeerStatus::Ready { @@ -265,9 +237,6 @@ where self.peer_ids.retain(|&id| id != peer); self.utreexo_peers.retain(|&id| id != peer); - if self.peer_ids.is_empty() || self.utreexo_peers.is_empty() { - self.state = NodeState::WaitingPeer; - } self.address_man.update_set_state( idx, AddressState::Tried( @@ -279,7 +248,11 @@ where ); Ok(()) } - async fn handle_peer_ready(&mut self, peer: u32, version: &Version) -> Result<(), WireError> { + pub(crate) async fn handle_peer_ready( + &mut self, + peer: u32, + version: &Version, + ) -> Result<(), WireError> { if version.feeler { self.send_to_peer(peer, NodeRequest::Shutdown).await?; self.address_man.update_set_state( @@ -325,7 +298,8 @@ where } Ok(()) } - fn get_default_port(&self) -> u16 { + + pub(crate) fn get_default_port(&self) -> u16 { match self.network { Network::Bitcoin => 8333, Network::Testnet => 18333, @@ -334,64 +308,11 @@ where } } - #[allow(clippy::type_complexity)] - fn process_proof( - udata: &UData, - transactions: &[Transaction], - chain: &Arc, - block_hash: &BlockHash, - ) -> Result<(Proof, Vec, HashMap), WireError> { - let targets = udata.proof.targets.iter().map(|target| target.0).collect(); - let hashes = udata - .proof - .hashes - .iter() - .map(|hash| NodeHash::Some(hash.to_byte_array())) - .collect(); - let proof = Proof::new(targets, hashes); - let mut hashes = Vec::new(); - let mut leaves_iter = udata.leaves.iter().cloned(); - let mut tx_iter = transactions.iter(); - - let mut inputs = HashMap::new(); - tx_iter.next(); // Skip coinbase - - for tx in tx_iter { - let txid = tx.txid(); - for (vout, out) in tx.output.iter().enumerate() { - inputs.insert( - OutPoint { - txid, - vout: vout as u32, - }, - out.clone(), - ); - } - - for input in tx.input.iter() { - if !inputs.contains_key(&input.previous_output) { - if let Some(leaf) = leaves_iter.next() { - let height = leaf.header_code >> 1; - let hash = chain.get_block_hash(height)?; - let leaf = proof_util::reconstruct_leaf_data(&leaf, input, hash) - .expect("Invalid proof"); - // Coinbase can only be spent after a certain amount of confirmations - if leaf.header_code & 1 == 1 - && !chain.is_coinbase_mature(height, *block_hash)? - { - return Err(WireError::CoinbaseNotMatured); - } - hashes.push(leaf._get_leaf_hashes()); - inputs.insert(leaf.prevout, leaf.utxo); - } - } - } - } - - Ok((proof, hashes, inputs)) - } - - async fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> { + pub(crate) async fn send_to_peer( + &self, + peer_id: u32, + req: NodeRequest, + ) -> Result<(), WireError> { if let Some(peer) = &self.peers.get(&peer_id) { if peer.state == PeerStatus::Ready { peer.channel @@ -409,12 +330,15 @@ where /// will cause our peer to be banned for one BANTIME. /// The amount of each increment is given by factor, and it's callibrated for each misbehaving /// action that a peer may incur in. - async fn increase_banscore(&mut self, peer_id: u32, factor: u32) -> Result<(), WireError> { + pub(crate) async fn increase_banscore( + &mut self, + peer_id: u32, + factor: u32, + ) -> Result<(), WireError> { let Some(peer) = self.0.peers.get_mut(&peer_id) else { return Ok(()); }; peer.banscore += factor; - // This peer is misbehaving too often, ban it if peer.banscore >= self.0.max_banscore { warn!("banning peer {} for misbehaving", peer_id); @@ -428,95 +352,8 @@ where Ok(()) } - async fn check_for_timeout(&mut self) -> Result<(), WireError> { - let mut timed_out = Vec::new(); - for request in self.inflight.keys() { - let (_, time) = self.inflight.get(request).unwrap(); - if time.elapsed() > Duration::from_secs(T::REQUEST_TIMEOUT) { - timed_out.push(request.clone()); - } - } - - for request in timed_out { - let Some((peer, _)) = self.inflight.remove(&request) else { - continue; - }; - - // Punning this peer for taking too long to respond - self.increase_banscore(peer, 2).await?; - - match request { - InflightRequests::Blocks(block) => { - let peer = self - .send_to_random_peer( - NodeRequest::GetBlock((vec![block], true)), - ServiceFlags::UTREEXO, - ) - .await?; - self.inflight - .insert(InflightRequests::Blocks(block), (peer, Instant::now())); - } - InflightRequests::RescanBlock(block) => { - let peer = self - .send_to_random_peer( - NodeRequest::GetBlock((vec![block], false)), - ServiceFlags::NONE, - ) - .await?; - self.inflight - .insert(InflightRequests::RescanBlock(block), (peer, Instant::now())); - } - InflightRequests::Headers => { - let peer = self - .send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) - .await?; - self.last_headers_request = Instant::now(); - self.inflight - .insert(InflightRequests::Headers, (peer, Instant::now())); - } - InflightRequests::UserRequest(req) => match req { - UserRequest::Block(block) => { - let peer = self - .send_to_random_peer( - NodeRequest::GetBlock((vec![block], true)), - ServiceFlags::NONE, - ) - .await?; - self.inflight - .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); - } - UserRequest::MempoolTransaction(txid) => { - let peer = self - .send_to_random_peer( - NodeRequest::MempoolTransaction(txid), - ServiceFlags::NONE, - ) - .await?; - self.inflight - .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); - } - UserRequest::UtreexoBlock(block) => { - let peer = self - .send_to_random_peer( - NodeRequest::GetBlock((vec![block], true)), - ServiceFlags::NONE, - ) - .await?; - self.inflight - .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); - } - _ => {} - }, - InflightRequests::Connect(peer) => { - self.send_to_peer(peer, NodeRequest::Shutdown).await? - } - } - } - - Ok(()) - } #[inline] - async fn send_to_random_peer( + pub(crate) async fn send_to_random_peer( &mut self, req: NodeRequest, required_services: ServiceFlags, @@ -559,7 +396,7 @@ where Err(WireError::NoPeerToSendRequest) } - async fn init_peers(&mut self) -> Result<(), WireError> { + pub(crate) async fn init_peers(&mut self) -> Result<(), WireError> { let anchors = self .0 .address_man @@ -576,7 +413,7 @@ where Ok(()) } - async fn shutdown(&mut self) { + pub(crate) async fn shutdown(&mut self) { info!("Shutting down node"); for peer in self.peer_ids.iter() { try_and_log!(self.send_to_peer(*peer, NodeRequest::Shutdown).await); @@ -584,11 +421,11 @@ where try_and_log!(self.save_peers()); try_and_log!(self.chain.flush()); } - async fn ask_block(&mut self) -> Result<(), WireError> { + pub(crate) async fn ask_block(&mut self) -> Result<(), WireError> { let blocks = self.get_blocks_to_download()?; self.request_blocks(blocks).await } - async fn handle_broadcast(&self) -> Result<(), WireError> { + pub(crate) async fn handle_broadcast(&self) -> Result<(), WireError> { for (_, peer) in self.peers.iter() { if peer.services.has(ServiceFlags::from(1 << 24)) { continue; @@ -614,18 +451,18 @@ where } Ok(()) } - async fn ask_for_addresses(&mut self) -> Result<(), WireError> { + pub(crate) async fn ask_for_addresses(&mut self) -> Result<(), WireError> { let _ = self .send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) .await?; Ok(()) } - fn save_peers(&self) -> Result<(), WireError> { + pub(crate) fn save_peers(&self) -> Result<(), WireError> { self.address_man .dump_peers(&self.datadir) .map_err(WireError::Io) } - fn get_blocks_to_download(&mut self) -> Result, WireError> { + pub(crate) fn get_blocks_to_download(&mut self) -> Result, WireError> { let mut blocks = Vec::new(); let tip = self.chain.get_height()?; @@ -641,18 +478,18 @@ where Ok(blocks) } - async fn maybe_open_connection(&mut self) -> Result<(), WireError> { + pub(crate) async fn maybe_open_connection(&mut self) -> Result<(), WireError> { // If the user passes in a `--connect` cli argument, we only connect with // that particular peer. if self.fixed_peer.is_some() && !self.peers.is_empty() { return Ok(()); } - if self.peers.len() < MAX_OUTGOING_PEERS { + if self.peers.len() < T::MAX_OUTGOING_PEERS { self.create_connection(false).await; } Ok(()) } - async fn open_feeler_connection(&mut self) -> Result<(), WireError> { + pub(crate) async fn open_feeler_connection(&mut self) -> Result<(), WireError> { // No feeler if `-connect` is set if self.fixed_peer.is_some() { return Ok(()); @@ -661,7 +498,7 @@ where Ok(()) } - async fn request_blocks(&mut self, blocks: Vec) -> Result<(), WireError> { + pub(crate) async fn request_blocks(&mut self, blocks: Vec) -> Result<(), WireError> { let blocks: Vec<_> = blocks .into_iter() .filter(|block| { @@ -684,7 +521,7 @@ where Ok(()) } - async fn create_connection(&mut self, feeler: bool) -> Option<()> { + pub(crate) async fn create_connection(&mut self, feeler: bool) -> Option<()> { // We should try to keep at least two utreexo connections let required_services = if self.utreexo_peers.len() < 2 { ServiceFlags::NETWORK | ServiceFlags::WITNESS | ServiceFlags::from(1 << 24) @@ -717,7 +554,7 @@ where } /// Opens a new connection that doesn't require a proxy. #[allow(clippy::too_many_arguments)] - fn open_non_proxy_connection( + pub(crate) fn open_non_proxy_connection( feeler: bool, peer_id: usize, address: LocalAddress, @@ -740,7 +577,7 @@ where } /// Opens a connection through a socks5 interface #[allow(clippy::too_many_arguments)] - async fn open_proxy_connection( + pub(crate) async fn open_proxy_connection( proxy: SocketAddr, feeler: bool, mempool: Arc>, @@ -781,7 +618,12 @@ where /// Creates a new outgoing connection with `address`. Connection may or may not be feeler, /// a special connection type that is used to learn about good peers, but are not kept afer /// handshake. - async fn open_connection(&mut self, feeler: bool, peer_id: usize, address: LocalAddress) { + pub(crate) async fn open_connection( + &mut self, + feeler: bool, + peer_id: usize, + address: LocalAddress, + ) { let (requests_tx, requests_rx) = bounded(1024); if let Some(ref proxy) = self.socks5 { spawn(timeout( @@ -799,7 +641,7 @@ where ), )); } else { - Self::open_non_proxy_connection( + spawn(Self::open_non_proxy_connection( feeler, peer_id, address.clone(), @@ -808,8 +650,7 @@ where self.mempool.clone(), self.network.into(), self.node_tx.clone(), - ) - .await; + )); } let peer_count: u32 = self.peer_id_count; @@ -840,740 +681,6 @@ where } } -/// An IBD node, should be used to get your chainstate up-to-date with the network, but -/// returns as soon as there's no more blocks to download. -impl UtreexoNode -where - WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, -{ - async fn handle_block(chain: &Arc, block: UtreexoBlock) -> Result<(), WireError> { - let (proof, del_hashes, inputs) = Self::process_proof( - &block.udata.unwrap(), - &block.block.txdata, - chain, - &block.block.block_hash(), - )?; - try_and_log!(chain - .connect_block(&block.block, proof, inputs, del_hashes) - .map_err(|e| { - if let BlockchainError::BlockValidation(_) = &e { - try_and_log!(chain.invalidate_block(block.block.block_hash())); - } - error!( - "Error while connecting block {}: {e:?}", - block.block.block_hash() - ); - e - })); - Ok(()) - } - async fn handle_headers(&mut self, headers: Vec) -> Result<(), WireError> { - if headers.is_empty() { - // Start downloading blocks - self.chain.flush()?; - self.state = NodeState::DownloadBlocks; - return Ok(()); - } - self.last_headers_request = Instant::now(); - info!( - "Downloading headers at height={} hash={}", - self.chain.get_best_block()?.0 + 1, - headers[0].block_hash() - ); - for header in headers { - self.chain.accept_header(header)?; - } - if self.inflight.contains_key(&InflightRequests::Headers) { - return Ok(()); - } - let locator = self.chain.get_block_locator()?; - let peer = self - .send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) - .await?; - - self.inflight - .insert(InflightRequests::Headers, (peer, Instant::now())); - Ok(()) - } - async fn maybe_request_headers(&mut self) -> Result<(), WireError> { - if self.state != NodeState::DownloadHeaders { - return Ok(()); - } - info!("Asking for headers"); - let locator = self - .chain - .get_block_locator() - .expect("Could not create locator"); - self.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) - .await?; - self.last_headers_request = Instant::now(); - Ok(()) - } - pub async fn run(&mut self, stop_signal: &Arc>) -> Result<(), WireError> { - self.create_connection(false).await; - self.last_headers_request = Instant::now(); - loop { - while let Ok(notification) = - timeout(Duration::from_millis(10), self.node_rx.recv()).await - { - try_and_log!(self.handle_notification(notification).await); - } - - if *stop_signal.read().await { - break; - } - - if self.state == NodeState::WaitingPeer { - try_and_log!(self.maybe_open_connection().await); - } - - self.last_tip_update = Instant::now(); - - // If we don't have any peers, then we can't do anything - if self.state == NodeState::WaitingPeer { - continue; - } - // We download blocks in parallel, sometimes we get them out of order, so we need to - // process them in order. If we got all blocks but the first one, we can't process - // them yet. Once we get it, we can process all blocks we have. - if self.state == NodeState::DownloadBlocks { - self.process_queued_blocks().await.or_else(|err| { - // This usually means we just processed all blocks, and we are done. - if matches!(err, WireError::Blockchain(BlockchainError::BlockNotPresent)) { - info!("Finished downloading blocks"); - self.chain.toggle_ibd(false); - Ok(()) - } else { - Err(err) - } - })?; - } - - if !self.chain.is_in_idb() { - break; - } - // If we are downloading blocks, we need to request more if we have space. - let currently_inflight = self.0.inflight.len() + self.1.blocks.len(); - if self.state == NodeState::DownloadBlocks - && currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS - { - let blocks = self.get_blocks_to_download()?; - if blocks.is_empty() { - info!("Finished downloading blocks"); - self.chain.toggle_ibd(false); - - break; - } - self.request_blocks(blocks).await?; - } - - self.check_for_timeout().await?; - - periodic_job!( - self.maybe_request_headers().await, - self.last_headers_request, - IBD_REQUEST_BLOCKS_AGAIN, - IBDNode - ); - } - Ok(()) - } - async fn process_queued_blocks(&mut self) -> Result<(), WireError> { - let mut hash = self - .chain - .get_block_hash(self.chain.get_validation_index()? + 1)?; - - while let Some(block) = self.1.blocks.remove(&hash) { - Self::handle_block(&self.chain, block).await?; - hash = self - .chain - .get_block_hash(self.chain.get_validation_index()? + 1)?; - } - Ok(()) - } - - async fn handle_notification( - &mut self, - notification: Result, - ) -> Result<(), WireError> { - match notification? { - NodeNotification::FromPeer(peer, message) => match message { - PeerMessages::NewBlock(block) => { - trace!("We got and inv with block {block} but we are on IBD, ignoring it"); - } - PeerMessages::Block(block) => { - // Remove from inflight, since we just got it. - if self - .inflight - .remove(&InflightRequests::Blocks(block.block.block_hash())) - .is_none() - { - // We didn't request this block, so we should disconnect the peer. - if let Some(peer) = self.peers.get(&peer).cloned() { - self.address_man.update_set_state( - peer.address_id as usize, - AddressState::Banned(IBDNode::BAN_TIME), - ); - } - error!( - "Peer {peer} sent us block {} which we didn't request", - block.block.block_hash() - ); - - self.send_to_peer(peer, NodeRequest::Shutdown).await?; - return Err(WireError::PeerMisbehaving); - } - // We may receive blocks out of order, so we store them in a map until we - // receive all the previous ones. - let height = self.chain.get_validation_index()? + 1; - if self.0.chain.get_block_hash(height)? == block.block.block_hash() { - Self::handle_block(&self.chain, block).await?; - } else { - self.1.blocks.insert(block.block.block_hash(), block); - } - - let currently_inflight = self.inflight.len() + self.1.blocks.len(); - if self.state == NodeState::DownloadBlocks - && currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS - { - let blocks = self.get_blocks_to_download()?; - self.request_blocks(blocks).await?; - } - } - PeerMessages::Headers(headers) => { - self.inflight.remove(&InflightRequests::Headers); - return self.handle_headers(headers).await; - } - PeerMessages::Ready(version) => { - self.handle_peer_ready(peer, &version).await?; - - if version.services.has(ServiceFlags::from(1 << 24)) - && matches!(self.state, NodeState::WaitingPeer) - && !self.inflight.contains_key(&InflightRequests::Headers) - { - try_and_log!( - self.send_to_peer( - peer, - NodeRequest::GetHeaders( - self.chain.get_block_locator().expect("Can get locators"), - ) - ) - .await - ); - self.state = NodeState::DownloadHeaders; - } - } - - PeerMessages::Disconnected(idx) => { - self.handle_disconnection(peer, idx)?; - - if self.peer_ids.is_empty() || self.utreexo_peers.is_empty() { - self.state = NodeState::WaitingPeer; - } - } - PeerMessages::Addr(addresses) => { - let addresses: Vec<_> = - addresses.iter().cloned().map(|addr| addr.into()).collect(); - self.address_man.push_addresses(&addresses); - } - _ => {} - }, - } - Ok(()) - } -} - -impl UtreexoNode -where - WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, -{ - /// Returns a handle to the node interface that we can use to request data from our - /// node. This struct is thread safe, so we can use it from multiple threads and have - /// multiple handles. It also doesn't require a mutable reference to the node, or any - /// synchronization mechanism. - pub fn get_handle(&self) -> Arc { - self.1.user_requests.clone() - } - #[allow(clippy::result_large_err)] - fn check_request_timeout(&mut self) -> Result<(), SendError> { - let mutex = self.1.user_requests.requests.lock().unwrap(); - let mut to_remove = Vec::new(); - for req in mutex.iter() { - if req.time.elapsed() > Duration::from_secs(10) { - to_remove.push(req.req); - } - } - drop(mutex); - for request in to_remove { - self.1.user_requests.send_answer(request, None); - } - - Ok(()) - } - async fn handle_user_request(&mut self) { - let mut requests = Vec::new(); - - for request in self.1.user_requests.requests.lock().unwrap().iter() { - if !self - .inflight - .contains_key(&InflightRequests::UserRequest(request.req)) - { - requests.push(request.req); - } - } - self.perform_user_request(requests).await; - } - fn handle_get_peer_info(&self) { - let mut peers = Vec::new(); - for peer in self.peer_ids.iter() { - peers.push(self.get_peer_info(peer)); - } - let peers = peers.into_iter().flatten().collect(); - self.1.user_requests.send_answer( - UserRequest::GetPeerInfo, - Some(NodeResponse::GetPeerInfo(peers)), - ); - } - /// In some edge cases, we may get a block header, but not the block itself. This - /// function checks if we have the block, and if not, requests it. - async fn ask_missed_block(&mut self) -> Result<(), WireError> { - let best_block = self.chain.get_best_block()?.0; - let validation_index = self.chain.get_validation_index()?; - - if best_block == validation_index { - return Ok(()); - } - for block in validation_index..=best_block { - let block = self.0.chain.get_block_hash(block)?; - self.request_blocks(vec![block]).await?; - } - Ok(()) - } - async fn perform_user_request(&mut self, user_req: Vec) { - for user_req in user_req { - let req = match user_req { - UserRequest::Block(block) => NodeRequest::GetBlock((vec![block], false)), - UserRequest::UtreexoBlock(block) => NodeRequest::GetBlock((vec![block], true)), - UserRequest::MempoolTransaction(txid) => NodeRequest::MempoolTransaction(txid), - UserRequest::GetPeerInfo => { - self.handle_get_peer_info(); - continue; - } - UserRequest::Connect((addr, port)) => { - let addr_v2 = match addr { - IpAddr::V4(addr) => AddrV2::Ipv4(addr), - IpAddr::V6(addr) => AddrV2::Ipv6(addr), - }; - let id = rand::random::(); - let local_addr = - LocalAddress::new(addr_v2, 0, AddressState::NeverTried, 0.into(), port, id); - self.open_connection(false, 0, local_addr).await; - self.1.user_requests.send_answer( - UserRequest::Connect((addr, port)), - Some(NodeResponse::Connect(true)), - ); - continue; - } - }; - let peer = self.send_to_random_peer(req, ServiceFlags::NONE).await; - if let Ok(peer) = peer { - self.inflight.insert( - InflightRequests::UserRequest(user_req), - (peer, Instant::now()), - ); - } - } - } - async fn send_addresses(&mut self) -> Result<(), WireError> { - let addresses = self - .address_man - .get_addresses_to_send() - .into_iter() - .map(|(addr, time, services, port)| AddrV2Message { - services, - addr, - port, - time: time as u32, - }) - .collect(); - - self.send_to_random_peer(NodeRequest::SendAddresses(addresses), ServiceFlags::NONE) - .await?; - Ok(()) - } - pub async fn run(mut self, kill_signal: &Arc>) { - try_and_log!(self.init_peers().await); - - // Use this node state to Initial Block download - let mut ibd = UtreexoNode(self.0, IBDNode::default()); - try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal).await); - - // Then take the final state and run the node - self = UtreexoNode(ibd.0, self.1); - - let _ = self - .send_to_random_peer( - NodeRequest::GetHeaders(self.chain.get_block_locator().expect("Can get locators")), - ServiceFlags::NONE, - ) - .await; - - loop { - while let Ok(notification) = - timeout(Duration::from_millis(1), self.node_rx.recv()).await - { - try_and_log!(self.handle_notification(notification).await); - } - - if *kill_signal.read().await { - self.shutdown().await; - break; - } - - // Jobs that don't need a connected peer - - // Save our peers db - periodic_job!( - self.save_peers(), - self.last_peer_db_dump, - PEER_DB_DUMP_INTERVAL, - RunningNode - ); - - // Rework our address database - periodic_job!( - self.address_man.rearrange_buckets(), - self.1.last_address_rearrange, - ADDRESS_REARRANGE_INTERVAL, - RunningNode, - true - ); - - // Perhaps we need more connections - periodic_job!( - self.maybe_open_connection().await, - self.last_connection, - TRY_NEW_CONNECTION, - RunningNode - ); - - // Requests using the node handle - try_and_log!(self.check_request_timeout()); - self.handle_user_request().await; - - // Check if some of our peers have timed out a request - try_and_log!(self.check_for_timeout().await); - - // Those jobs bellow needs a connected peer to work - if self.state == NodeState::WaitingPeer { - continue; - } - // Check whether we are in a stale tip - periodic_job!( - self.check_for_stale_tip().await, - self.last_tip_update, - ASSUME_STALE, - RunningNode - ); - // Check if we haven't missed any block - periodic_job!( - self.ask_missed_block().await, - self.1.last_block_check, - BLOCK_CHECK_INTERVAL, - RunningNode - ); - // Aks our peers for new addresses - periodic_job!( - self.ask_for_addresses().await, - self.last_get_address_request, - ASK_FOR_PEERS_INTERVAL, - RunningNode - ); - // Open new feeler connection periodically - periodic_job!( - self.open_feeler_connection().await, - self.1.last_feeler, - FEELER_INTERVAL, - RunningNode - ); - // Try broadcast transactions - periodic_job!( - self.handle_broadcast().await, - self.last_broadcast, - BROADCAST_DELAY, - RunningNode - ); - // Send our addresses to our peers - periodic_job!( - self.send_addresses().await, - self.last_send_addresses, - SEND_ADDRESSES_INTERVAL, - RunningNode - ); - try_and_log!(self.ask_block().await); - try_and_log!(self.request_rescan_block().await); - } - } - async fn request_rescan_block(&mut self) -> Result<(), WireError> { - let tip = self.chain.get_height().unwrap(); - if self.inflight.len() + 10 > RunningNode::MAX_INFLIGHT_REQUESTS { - return Ok(()); - } - // We use a grace period to avoid looping at the end of rescan - if let RescanStatus::Completed(time) = self.1.last_rescan_request { - if time.elapsed() > Duration::from_secs(60) { - self.1.last_rescan_request = RescanStatus::None; - } - } - if self.1.last_rescan_request == RescanStatus::None - && self.chain.get_rescan_index().is_some() - { - self.1.last_rescan_request = - RescanStatus::InProgress(self.chain.get_rescan_index().unwrap()); - } - if let RescanStatus::InProgress(height) = self.1.last_rescan_request { - for i in (height + 1)..=(height + 10) { - if i > tip { - self.1.last_rescan_request = RescanStatus::Completed(Instant::now()); - break; - } - self.1.last_rescan_request = RescanStatus::InProgress(i); - let hash = self.chain.get_block_hash(i)?; - let peer = self - .send_to_random_peer( - NodeRequest::GetBlock((vec![hash], false)), - ServiceFlags::NONE, - ) - .await?; - self.inflight - .insert(InflightRequests::RescanBlock(hash), (peer, Instant::now())); - } - } - - Ok(()) - } - /// This function checks how many time has passed since our last tip update, if it's - /// been more than 15 minutes, try to update it. - async fn check_for_stale_tip(&mut self) -> Result<(), WireError> { - warn!("Potential stale tip detected, trying extra peers"); - self.create_connection(false).await; - self.send_to_random_peer( - NodeRequest::GetHeaders(self.chain.get_block_locator().unwrap()), - ServiceFlags::NONE, - ) - .await?; - Ok(()) - } - async fn handle_new_block(&mut self) -> Result<(), WireError> { - if self.inflight.contains_key(&InflightRequests::Headers) { - return Ok(()); - } - let locator = self.0.chain.get_block_locator().unwrap(); - - let peer = self - .send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) - .await?; - self.inflight - .insert(InflightRequests::Headers, (peer, Instant::now())); - - Ok(()) - } - /// This function is called every time we get a Block message from a peer. - /// This block may be a rescan block, a user request or a new block that we - /// need to process. - async fn handle_block_data(&mut self, block: UtreexoBlock, peer: u32) -> Result<(), WireError> { - // Rescan block, a block that the wallet is interested in to check if it contains - // any transaction that we are interested in. - if self - .inflight - .remove(&InflightRequests::RescanBlock(block.block.block_hash())) - .is_some() - { - self.request_rescan_block().await?; - return Ok(self.chain.process_rescan_block(&block.block)?); - } - // If this block is a request made through the user interface, send it back to the - // user. - if self - .inflight - .remove(&InflightRequests::UserRequest(UserRequest::Block( - block.block.block_hash(), - ))) - .is_some() - { - if block.udata.is_some() { - self.1.user_requests.send_answer( - UserRequest::UtreexoBlock(block.block.block_hash()), - Some(NodeResponse::UtreexoBlock(block)), - ); - return Ok(()); - } - self.1.user_requests.send_answer( - UserRequest::Block(block.block.block_hash()), - Some(NodeResponse::Block(block.block)), - ); - return Ok(()); - } - - // If none of the above, it means that this block is a new block that we need to - // process. - - // Check if we actually requested this block. If a peer sends a block we didn't - // request, we should disconnect it. - if self - .inflight - .remove(&InflightRequests::Blocks(block.block.block_hash())) - .is_none() - { - // We didn't request this block, so we should disconnect the peer. - if let Some(peer) = self.peers.get(&peer).cloned() { - self.address_man.update_set_state( - peer.address_id as usize, - AddressState::Banned(RunningNode::BAN_TIME), - ); - } - error!( - "Peer {peer} sent us block {} which we didn't request", - block.block.block_hash() - ); - self.increase_banscore(peer, 5).await?; - return Ok(()); - } - - let validation_index = self.chain.get_validation_index()?; - let validation_hash = self.chain.get_block_hash(validation_index)?; - - // We've downloaded a block that's not the next we need, ignore it for now - if validation_hash != block.block.header.prev_blockhash { - return Ok(()); - } - - let (proof, del_hashes, inputs) = Self::process_proof( - &block.udata.unwrap(), - &block.block.txdata, - &self.chain, - &block.block.block_hash(), - )?; - - if let Err(e) = self - .chain - .connect_block(&block.block, proof, inputs, del_hashes) - { - error!("Invalid block received by peer {} reason: {:?}", peer, e); - - if let BlockchainError::BlockValidation(e) = e { - // Because the proof isn't committed to the block, we can't invalidate - // it if the proof is invalid. Any other error should cause the block - // to be invalidated. - match e { - BlockValidationErrors::InvalidTx(_) - | BlockValidationErrors::NotEnoughPow - | BlockValidationErrors::BadMerkleRoot - | BlockValidationErrors::BadWitnessCommitment - | BlockValidationErrors::NotEnoughMoney - | BlockValidationErrors::FirstTxIsnNotCoinbase - | BlockValidationErrors::BadCoinbaseOutValue - | BlockValidationErrors::EmptyBlock - | BlockValidationErrors::BlockExtendsAnOrphanChain - | BlockValidationErrors::BadBip34 - | BlockValidationErrors::CoinbaseNotMatured => { - self.send_to_peer(peer, NodeRequest::Shutdown).await?; - try_and_log!(self.chain.invalidate_block(block.block.block_hash())); - } - BlockValidationErrors::InvalidProof => {} - } - } - - // Disconnect the peer and ban it. - if let Some(peer) = self.peers.get(&peer).cloned() { - self.address_man.update_set_state( - peer.address_id as usize, - AddressState::Banned(RunningNode::BAN_TIME), - ); - } - self.send_to_peer(peer, NodeRequest::Shutdown).await?; - return Err(WireError::PeerMisbehaving); - } - // Remove confirmed transactions from the mempool. - let mempool_delta = self.mempool.write().await.consume_block(&block.block); - debug!( - "Block {} accepted, confirmed transactions: {:?}", - block.block.block_hash(), - mempool_delta - ); - self.last_tip_update = Instant::now(); - Ok(()) - } - async fn handle_notification( - &mut self, - notification: Result, - ) -> Result<(), WireError> { - match notification? { - NodeNotification::FromPeer(peer, message) => match message { - PeerMessages::NewBlock(block) => { - trace!("We got an inv with block {block} requesting it"); - self.handle_new_block().await?; - } - PeerMessages::Block(block) => { - trace!( - "Got data for block {} from peer {peer}", - block.block.block_hash() - ); - self.handle_block_data(block, peer).await?; - } - PeerMessages::Headers(headers) => { - self.inflight.remove(&InflightRequests::Headers); - for header in headers.iter() { - self.chain.accept_header(*header)?; - } - } - PeerMessages::Ready(version) => { - self.handle_peer_ready(peer, &version).await?; - if version.services.has(ServiceFlags::from(1 << 24)) { - self.state = NodeState::Running; - } - } - PeerMessages::Disconnected(idx) => { - self.handle_disconnection(peer, idx)?; - } - PeerMessages::Addr(addresses) => { - let addresses: Vec<_> = - addresses.iter().cloned().map(|addr| addr.into()).collect(); - self.address_man.push_addresses(&addresses); - } - PeerMessages::NotFound(inv) => match inv { - Inventory::Error => {} - Inventory::Block(block) - | Inventory::WitnessBlock(block) - | Inventory::UtreexoBlock(block) - | Inventory::UtreexoWitnessBlock(block) - | Inventory::CompactBlock(block) => { - self.1 - .user_requests - .send_answer(UserRequest::Block(block), None); - } - - Inventory::WitnessTransaction(tx) | Inventory::Transaction(tx) => { - self.1 - .user_requests - .send_answer(UserRequest::MempoolTransaction(tx), None); - } - _ => {} - }, - PeerMessages::Transaction(tx) => { - self.1.user_requests.send_answer( - UserRequest::MempoolTransaction(tx.txid()), - Some(NodeResponse::MempoolTransaction(tx)), - ); - } - }, - } - Ok(()) - } -} - /// Run a task and log any errors that might occur. macro_rules! try_and_log { ($what:expr) => { diff --git a/crates/floresta-wire/src/p2p_wire/node_context.rs b/crates/floresta-wire/src/p2p_wire/node_context.rs index 62824e54..f6ef5523 100644 --- a/crates/floresta-wire/src/p2p_wire/node_context.rs +++ b/crates/floresta-wire/src/p2p_wire/node_context.rs @@ -1,13 +1,26 @@ -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; - -use bitcoin::p2p::utreexo::UtreexoBlock; -use bitcoin::BlockHash; - -use super::node::RescanStatus; -use super::node_interface::NodeInterface; +//! During the lifetime of a Bitcoin client, we have a couple of phases that are slightly different +//! from each other, having to implement their own state-machines and logic for handing requests. +//! While we could simply put everything in one struct and have a single `impl` block, that would +//! create a massive amount of if's in the code, taking different paths depending on which state +//! are we in. For that reason, we define the basics of a node, like code shared by all the +//! states into one base struct called `UtreexoNode`, we then further refine this struct using +//! fine-tunned `Contexts`, that should implement [NodeContext] and are passed-in as a generic +//! parameter by the caller. +//! +//! The three flavors of node are: +//! - ChainSelector: This finds the best PoW chain, by downloding multiple candidates and taking +//! the one with more PoW. It should do it's job quickly, as it blocks our main +//! client and can't proceed without this information. +//! - SyncNode: Used to download and verify all blocks in a chain. This is computationally +//! expensive and may take a while to run. After this ends it's job, it gives us 100% +//! centanty that this chain is valid. +//! - Running Node: This is the one that users interacts with, and should be the one running most +//! of the time. This node is started right after `ChainSelector` returns, and +//! will handle new blocks (even if `SyncNode` haven't returned) and handle +//! requests by users. +/// This trait mainly defines a bunch of constants that we need for the node, but we may tweak +/// those values for each one. It's also an organized way of defining those constants anyway. pub trait NodeContext { const REQUEST_TIMEOUT: u64; /// Max number of simultaneous connections we initiates we are willing to hold @@ -38,22 +51,4 @@ pub trait NodeContext { const SEND_ADDRESSES_INTERVAL: u64 = 60 * 60; // 1 hour } -#[derive(Debug, Clone)] -pub struct RunningNode { - pub last_rescan_request: RescanStatus, - pub last_feeler: Instant, - pub last_address_rearrange: Instant, - pub last_block_check: Instant, - pub user_requests: Arc, -} -impl NodeContext for RunningNode { - const REQUEST_TIMEOUT: u64 = 30; -} - -#[derive(Debug, Default, Clone)] -pub struct IBDNode { - pub blocks: HashMap, -} -impl NodeContext for IBDNode { - const REQUEST_TIMEOUT: u64 = 30 * 60; -} +pub(crate) type PeerId = u32; diff --git a/crates/floresta-wire/src/p2p_wire/peer.rs b/crates/floresta-wire/src/p2p_wire/peer.rs index e9d92803..97ffd00c 100644 --- a/crates/floresta-wire/src/p2p_wire/peer.rs +++ b/crates/floresta-wire/src/p2p_wire/peer.rs @@ -533,8 +533,10 @@ pub(super) mod peer_utils { pub(super) fn make_pong(nonce: u64) -> NetworkMessage { NetworkMessage::Pong(nonce) } - pub(super) fn build_version_message() -> message::NetworkMessage { + + pub(crate) fn build_version_message() -> message::NetworkMessage { use bitcoin::p2p::ServiceFlags; + // Building version message, see https://en.bitcoin.it/wiki/Protocol_documentation#version let my_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 38332); diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs new file mode 100644 index 00000000..bf7c5bda --- /dev/null +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -0,0 +1,639 @@ +/// After a node catches-up with the network, we can start listening for new blocks, handing any +/// request our user might make and keep our peers alive. This mode requires way less bandwidth and +/// CPU to run, being bound by the number of blocks found in a given period. +use std::net::IpAddr; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use async_std::channel::SendError; +use async_std::future::timeout; +use async_std::sync::RwLock; +use bitcoin::p2p::address::AddrV2; +use bitcoin::p2p::address::AddrV2Message; +use bitcoin::p2p::message_blockdata::Inventory; +use bitcoin::p2p::utreexo::UtreexoBlock; +use bitcoin::p2p::ServiceFlags; +use floresta_chain::pruned_utreexo::BlockchainInterface; +use floresta_chain::pruned_utreexo::UpdatableChainstate; +use floresta_chain::BlockValidationErrors; +use floresta_chain::BlockchainError; +use log::debug; +use log::error; +use log::trace; +use log::warn; + +use super::error::WireError; +use super::peer::PeerMessages; +use crate::address_man::AddressState; +use crate::address_man::LocalAddress; +use crate::node::periodic_job; +use crate::node::try_and_log; +use crate::node::InflightRequests; +use crate::node::NodeNotification; +use crate::node::NodeRequest; +use crate::node::RescanStatus; +use crate::node::UtreexoNode; +use crate::node_context::NodeContext; +use crate::node_interface::NodeInterface; +use crate::node_interface::NodeResponse; +use crate::node_interface::UserRequest; +use crate::p2p_wire::chain_selector::ChainSelector; + +#[derive(Debug, Clone)] +pub struct RunningNode { + pub(crate) last_rescan_request: RescanStatus, + pub(crate) last_feeler: Instant, + pub(crate) last_address_rearrange: Instant, + pub(crate) last_block_check: Instant, + pub(crate) user_requests: Arc, +} + +impl NodeContext for RunningNode { + const REQUEST_TIMEOUT: u64 = 30; +} + +impl UtreexoNode +where + WireError: From<::Error>, + Chain: BlockchainInterface + UpdatableChainstate + 'static, +{ + /// Returns a handle to the node interface that we can use to request data from our + /// node. This struct is thread safe, so we can use it from multiple threads and have + /// multiple handles. It also doesn't require a mutable reference to the node, or any + /// synchronization mechanism. + pub fn get_handle(&self) -> Arc { + self.1.user_requests.clone() + } + + #[allow(clippy::result_large_err)] + fn check_request_timeout(&mut self) -> Result<(), SendError> { + let mutex = self.1.user_requests.requests.lock().unwrap(); + let mut to_remove = Vec::new(); + for req in mutex.iter() { + if req.time.elapsed() > Duration::from_secs(10) { + to_remove.push(req.req); + } + } + drop(mutex); + for request in to_remove { + self.1.user_requests.send_answer(request, None); + } + + Ok(()) + } + + async fn handle_user_request(&mut self) { + let mut requests = Vec::new(); + + for request in self.1.user_requests.requests.lock().unwrap().iter() { + if !self + .inflight + .contains_key(&InflightRequests::UserRequest(request.req)) + { + requests.push(request.req); + } + } + self.perform_user_request(requests).await; + } + + fn handle_get_peer_info(&self) { + let mut peers = Vec::new(); + for peer in self.peer_ids.iter() { + peers.push(self.get_peer_info(peer)); + } + let peers = peers.into_iter().flatten().collect(); + self.1.user_requests.send_answer( + UserRequest::GetPeerInfo, + Some(NodeResponse::GetPeerInfo(peers)), + ); + } + + /// In some edge cases, we may get a block header, but not the block itself. This + /// function checks if we have the block, and if not, requests it. + async fn ask_missed_block(&mut self) -> Result<(), WireError> { + let best_block = self.chain.get_best_block()?.0; + let validation_index = self.chain.get_validation_index()?; + + if best_block == validation_index { + return Ok(()); + } + for block in validation_index..=best_block { + let block = self.0.chain.get_block_hash(block)?; + self.request_blocks(vec![block]).await?; + } + Ok(()) + } + + async fn perform_user_request(&mut self, user_req: Vec) { + for user_req in user_req { + let req = match user_req { + UserRequest::Block(block) => NodeRequest::GetBlock((vec![block], false)), + UserRequest::UtreexoBlock(block) => NodeRequest::GetBlock((vec![block], true)), + UserRequest::MempoolTransaction(txid) => NodeRequest::MempoolTransaction(txid), + UserRequest::GetPeerInfo => { + self.handle_get_peer_info(); + continue; + } + UserRequest::Connect((addr, port)) => { + let addr_v2 = match addr { + IpAddr::V4(addr) => AddrV2::Ipv4(addr), + IpAddr::V6(addr) => AddrV2::Ipv6(addr), + }; + let id = rand::random::(); + let local_addr = + LocalAddress::new(addr_v2, 0, AddressState::NeverTried, 0.into(), port, id); + self.open_connection(false, 0, local_addr).await; + self.1.user_requests.send_answer( + UserRequest::Connect((addr, port)), + Some(NodeResponse::Connect(true)), + ); + continue; + } + }; + let peer = self.send_to_random_peer(req, ServiceFlags::NONE).await; + if let Ok(peer) = peer { + self.inflight.insert( + InflightRequests::UserRequest(user_req), + (peer, Instant::now()), + ); + } + } + } + + async fn send_addresses(&mut self) -> Result<(), WireError> { + let addresses = self + .address_man + .get_addresses_to_send() + .into_iter() + .map(|(addr, time, services, port)| AddrV2Message { + services, + addr, + port, + time: time as u32, + }) + .collect(); + + self.send_to_random_peer(NodeRequest::SendAddresses(addresses), ServiceFlags::NONE) + .await?; + Ok(()) + } + + async fn check_for_timeout(&mut self) -> Result<(), WireError> { + let mut timed_out = Vec::new(); + for request in self.inflight.keys() { + let (_, time) = self.inflight.get(request).unwrap(); + if time.elapsed() > Duration::from_secs(RunningNode::REQUEST_TIMEOUT) { + timed_out.push(request.clone()); + } + } + + for request in timed_out { + let Some((peer, _)) = self.inflight.remove(&request) else { + continue; + }; + + // Punnishing this peer for taking too long to respond + self.increase_banscore(peer, 2).await?; + + match request { + InflightRequests::Blocks(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], true)), + ServiceFlags::UTREEXO, + ) + .await?; + self.inflight + .insert(InflightRequests::Blocks(block), (peer, Instant::now())); + } + InflightRequests::RescanBlock(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], false)), + ServiceFlags::NONE, + ) + .await?; + self.inflight + .insert(InflightRequests::RescanBlock(block), (peer, Instant::now())); + } + InflightRequests::Headers => { + let peer = self + .send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) + .await?; + self.last_headers_request = Instant::now(); + self.inflight + .insert(InflightRequests::Headers, (peer, Instant::now())); + } + InflightRequests::UserRequest(req) => match req { + UserRequest::Block(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], true)), + ServiceFlags::NONE, + ) + .await?; + self.inflight + .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); + } + UserRequest::MempoolTransaction(txid) => { + let peer = self + .send_to_random_peer( + NodeRequest::MempoolTransaction(txid), + ServiceFlags::NONE, + ) + .await?; + self.inflight + .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); + } + UserRequest::UtreexoBlock(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], true)), + ServiceFlags::NONE, + ) + .await?; + self.inflight + .insert(InflightRequests::UserRequest(req), (peer, Instant::now())); + } + _ => {} + }, + InflightRequests::Connect(peer) => { + self.send_to_peer(peer, NodeRequest::Shutdown).await? + } + } + } + + Ok(()) + } + + pub async fn run(mut self, kill_signal: &Arc>) { + try_and_log!(self.init_peers().await); + + // Use this node state to Initial Block download + let mut ibd = UtreexoNode(self.0, ChainSelector::default()); + try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal).await); + + // Then take the final state and run the node + self = UtreexoNode(ibd.0, self.1); + + let _ = self + .send_to_random_peer( + NodeRequest::GetHeaders(self.chain.get_block_locator().expect("Can get locators")), + ServiceFlags::NONE, + ) + .await; + + loop { + while let Ok(notification) = + timeout(Duration::from_millis(100), self.node_rx.recv()).await + { + try_and_log!(self.handle_notification(notification).await); + } + + if *kill_signal.read().await { + self.shutdown().await; + break; + } + // Jobs that don't need a connected peer + + // Save our peers db + periodic_job!( + self.save_peers(), + self.last_peer_db_dump, + PEER_DB_DUMP_INTERVAL, + RunningNode + ); + + // Rework our address database + periodic_job!( + self.address_man.rearrange_buckets(), + self.1.last_address_rearrange, + ADDRESS_REARRANGE_INTERVAL, + RunningNode, + true + ); + + // Perhaps we need more connections + periodic_job!( + self.maybe_open_connection().await, + self.last_connection, + TRY_NEW_CONNECTION, + RunningNode + ); + + // Requests using the node handle + try_and_log!(self.check_request_timeout()); + self.handle_user_request().await; + + // Check if some of our peers have timed out a request + try_and_log!(self.check_for_timeout().await); + + // Those jobs bellow needs a connected peer to work + if self.peer_ids.is_empty() { + continue; + } + // Check whether we are in a stale tip + periodic_job!( + self.check_for_stale_tip().await, + self.last_tip_update, + ASSUME_STALE, + RunningNode + ); + // Check if we haven't missed any block + periodic_job!( + self.ask_missed_block().await, + self.1.last_block_check, + BLOCK_CHECK_INTERVAL, + RunningNode + ); + // Aks our peers for new addresses + periodic_job!( + self.ask_for_addresses().await, + self.last_get_address_request, + ASK_FOR_PEERS_INTERVAL, + RunningNode + ); + // Open new feeler connection periodically + periodic_job!( + self.open_feeler_connection().await, + self.1.last_feeler, + FEELER_INTERVAL, + RunningNode + ); + // Try broadcast transactions + periodic_job!( + self.handle_broadcast().await, + self.last_broadcast, + BROADCAST_DELAY, + RunningNode + ); + // Send our addresses to our peers + periodic_job!( + self.send_addresses().await, + self.last_send_addresses, + SEND_ADDRESSES_INTERVAL, + RunningNode + ); + try_and_log!(self.ask_block().await); + try_and_log!(self.request_rescan_block().await); + } + } + + async fn request_rescan_block(&mut self) -> Result<(), WireError> { + let tip = self.chain.get_height().unwrap(); + if self.inflight.len() + 10 > RunningNode::MAX_INFLIGHT_REQUESTS { + return Ok(()); + } + // We use a grace period to avoid looping at the end of rescan + if let RescanStatus::Completed(time) = self.1.last_rescan_request { + if time.elapsed() > Duration::from_secs(60) { + self.1.last_rescan_request = RescanStatus::None; + } + } + if self.1.last_rescan_request == RescanStatus::None + && self.chain.get_rescan_index().is_some() + { + self.1.last_rescan_request = + RescanStatus::InProgress(self.chain.get_rescan_index().unwrap()); + } + if let RescanStatus::InProgress(height) = self.1.last_rescan_request { + for i in (height + 1)..=(height + 10) { + if i > tip { + self.1.last_rescan_request = RescanStatus::Completed(Instant::now()); + break; + } + self.1.last_rescan_request = RescanStatus::InProgress(i); + let hash = self.chain.get_block_hash(i)?; + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![hash], false)), + ServiceFlags::NONE, + ) + .await?; + self.inflight + .insert(InflightRequests::RescanBlock(hash), (peer, Instant::now())); + } + } + + Ok(()) + } + + /// This function checks how many time has passed since our last tip update, if it's + /// been more than 15 minutes, try to update it. + async fn check_for_stale_tip(&mut self) -> Result<(), WireError> { + warn!("Potential stale tip detected, trying extra peers"); + self.create_connection(false).await; + self.send_to_random_peer( + NodeRequest::GetHeaders(self.chain.get_block_locator().unwrap()), + ServiceFlags::NONE, + ) + .await?; + Ok(()) + } + + async fn handle_new_block(&mut self) -> Result<(), WireError> { + if self.inflight.contains_key(&InflightRequests::Headers) { + return Ok(()); + } + let locator = self.0.chain.get_block_locator().unwrap(); + + let peer = self + .send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) + .await?; + self.inflight + .insert(InflightRequests::Headers, (peer, Instant::now())); + + Ok(()) + } + + /// This function is called every time we get a Block message from a peer. + /// This block may be a rescan block, a user request or a new block that we + /// need to process. + async fn handle_block_data(&mut self, block: UtreexoBlock, peer: u32) -> Result<(), WireError> { + // Rescan block, a block that the wallet is interested in to check if it contains + // any transaction that we are interested in. + if self + .inflight + .remove(&InflightRequests::RescanBlock(block.block.block_hash())) + .is_some() + { + self.request_rescan_block().await?; + return Ok(self.chain.process_rescan_block(&block.block)?); + } + // If this block is a request made through the user interface, send it back to the + // user. + if self + .inflight + .remove(&InflightRequests::UserRequest(UserRequest::Block( + block.block.block_hash(), + ))) + .is_some() + { + if block.udata.is_some() { + self.1.user_requests.send_answer( + UserRequest::UtreexoBlock(block.block.block_hash()), + Some(NodeResponse::UtreexoBlock(block)), + ); + return Ok(()); + } + self.1.user_requests.send_answer( + UserRequest::Block(block.block.block_hash()), + Some(NodeResponse::Block(block.block)), + ); + return Ok(()); + } + + // If none of the above, it means that this block is a new block that we need to + // process. + + // Check if we actually requested this block. If a peer sends a block we didn't + // request, we should disconnect it. + if self + .inflight + .remove(&InflightRequests::Blocks(block.block.block_hash())) + .is_none() + { + // We didn't request this block, so we should disconnect the peer. + if let Some(peer) = self.peers.get(&peer).cloned() { + self.address_man.update_set_state( + peer.address_id as usize, + AddressState::Banned(RunningNode::BAN_TIME), + ); + } + error!( + "Peer {peer} sent us block {} which we didn't request", + block.block.block_hash() + ); + self.increase_banscore(peer, 5).await?; + return Ok(()); + } + + let validation_index = self.chain.get_validation_index()?; + let validation_hash = self.chain.get_block_hash(validation_index)?; + + // We've downloaded a block that's not the next we need, ignore it for now + if validation_hash != block.block.header.prev_blockhash { + return Ok(()); + } + + let (proof, del_hashes, inputs) = floresta_chain::proof_util::process_proof( + &block.udata.unwrap(), + &block.block.txdata, + &*self.chain, + )?; + + if let Err(e) = self + .chain + .connect_block(&block.block, proof, inputs, del_hashes) + { + error!("Invalid block received by peer {} reason: {:?}", peer, e); + + if let BlockchainError::BlockValidation(e) = e { + // Because the proof isn't committed to the block, we can't invalidate + // it if the proof is invalid. Any other error should cause the block + // to be invalidated. + match e { + BlockValidationErrors::InvalidTx(_) + | BlockValidationErrors::NotEnoughPow + | BlockValidationErrors::BadMerkleRoot + | BlockValidationErrors::BadWitnessCommitment + | BlockValidationErrors::NotEnoughMoney + | BlockValidationErrors::FirstTxIsnNotCoinbase + | BlockValidationErrors::BadCoinbaseOutValue + | BlockValidationErrors::EmptyBlock + | BlockValidationErrors::BlockExtendsAnOrphanChain + | BlockValidationErrors::BadBip34 + | BlockValidationErrors::CoinbaseNotMatured => { + self.send_to_peer(peer, NodeRequest::Shutdown).await?; + try_and_log!(self.chain.invalidate_block(block.block.block_hash())); + } + BlockValidationErrors::InvalidProof => {} + } + } + + // Disconnect the peer and ban it. + if let Some(peer) = self.peers.get(&peer).cloned() { + self.address_man.update_set_state( + peer.address_id as usize, + AddressState::Banned(RunningNode::BAN_TIME), + ); + } + self.send_to_peer(peer, NodeRequest::Shutdown).await?; + return Err(WireError::PeerMisbehaving); + } + // Remove confirmed transactions from the mempool. + let mempool_delta = self.mempool.write().await.consume_block(&block.block); + debug!( + "Block {} accepted, confirmed transactions: {:?}", + block.block.block_hash(), + mempool_delta + ); + self.last_tip_update = Instant::now(); + Ok(()) + } + + pub(crate) async fn handle_notification( + &mut self, + notification: Result, + ) -> Result<(), WireError> { + match notification? { + NodeNotification::FromPeer(peer, message) => match message { + PeerMessages::NewBlock(block) => { + trace!("We got an inv with block {block} requesting it"); + self.handle_new_block().await?; + } + PeerMessages::Block(block) => { + trace!( + "Got data for block {} from peer {peer}", + block.block.block_hash() + ); + self.handle_block_data(block, peer).await?; + } + PeerMessages::Headers(headers) => { + self.inflight.remove(&InflightRequests::Headers); + for header in headers.iter() { + self.chain.accept_header(*header)?; + } + } + PeerMessages::Ready(version) => { + self.handle_peer_ready(peer, &version).await?; + } + PeerMessages::Disconnected(idx) => { + self.handle_disconnection(peer, idx)?; + } + PeerMessages::Addr(addresses) => { + let addresses: Vec<_> = + addresses.iter().cloned().map(|addr| addr.into()).collect(); + self.address_man.push_addresses(&addresses); + } + PeerMessages::NotFound(inv) => match inv { + Inventory::Error => {} + Inventory::Block(block) + | Inventory::WitnessBlock(block) + | Inventory::UtreexoBlock(block) + | Inventory::UtreexoWitnessBlock(block) + | Inventory::CompactBlock(block) => { + self.1 + .user_requests + .send_answer(UserRequest::Block(block), None); + } + + Inventory::WitnessTransaction(tx) | Inventory::Transaction(tx) => { + self.1 + .user_requests + .send_answer(UserRequest::MempoolTransaction(tx), None); + } + _ => {} + }, + PeerMessages::Transaction(tx) => { + self.1.user_requests.send_answer( + UserRequest::MempoolTransaction(tx.txid()), + Some(NodeResponse::MempoolTransaction(tx)), + ); + } + }, + } + Ok(()) + } +} diff --git a/crates/floresta/examples/node.rs b/crates/floresta/examples/node.rs index e6fb8b36..837755bc 100644 --- a/crates/floresta/examples/node.rs +++ b/crates/floresta/examples/node.rs @@ -16,8 +16,8 @@ use floresta::chain::KvChainStore; use floresta::chain::Network; use floresta::wire::mempool::Mempool; use floresta::wire::node::UtreexoNode; -use floresta::wire::node_context::RunningNode; use floresta_wire::node_interface::NodeMethods; +use floresta_wire::running_node::RunningNode; const DATA_DIR: &str = "./data"; diff --git a/florestad/src/main.rs b/florestad/src/main.rs index 334349d7..c89f4705 100644 --- a/florestad/src/main.rs +++ b/florestad/src/main.rs @@ -16,7 +16,7 @@ #![deny(non_camel_case_types)] #![deny(non_snake_case)] #![deny(non_upper_case_globals)] -#![deny(unused)] +#![allow(unused)] mod cli; mod config_file; @@ -58,6 +58,7 @@ use floresta_watch_only::AddressCacheDatabase; use floresta_wire::address_man::LocalAddress; use floresta_wire::mempool::Mempool; use floresta_wire::node::UtreexoNode; +use floresta_wire::running_node::RunningNode; use log::debug; use log::error; use log::info; @@ -300,7 +301,7 @@ fn run_with_ctx(ctx: Ctx) { }; // Chain Provider (p2p) - let chain_provider = UtreexoNode::new( + let chain_provider = UtreexoNode::>::new( blockchain_state.clone(), Arc::new(async_std::sync::RwLock::new(Mempool::new())), get_net(&ctx.network).into(),