From cf25472da8111f8da9517f94dcabc5b3a35ecdcf Mon Sep 17 00:00:00 2001 From: Davidson Souza <40968167+Davidson-Souza@users.noreply.github.com> Date: Thu, 15 Jun 2023 13:04:20 -0300 Subject: [PATCH] Move the node context to another file (#56) The node.rs file is over 1k lines, it would be better if we can reduce this a little. This commit moves all the node context data out of node.rs and puts the constants inside NodeContext. This is also useful, because we can fine-tune those constants by Context --- src/blockchain/p2p_blockchain/mod.rs | 1 + src/blockchain/p2p_blockchain/node.rs | 130 ++++++++---------- src/blockchain/p2p_blockchain/node_context.rs | 50 +++++++ 3 files changed, 105 insertions(+), 76 deletions(-) create mode 100644 src/blockchain/p2p_blockchain/node_context.rs diff --git a/src/blockchain/p2p_blockchain/mod.rs b/src/blockchain/p2p_blockchain/mod.rs index 1168aa0b..c652cc55 100644 --- a/src/blockchain/p2p_blockchain/mod.rs +++ b/src/blockchain/p2p_blockchain/mod.rs @@ -3,6 +3,7 @@ pub mod address_man; pub mod mempool; pub mod node; +pub mod node_context; pub mod node_interface; pub mod peer; pub mod stream_reader; diff --git a/src/blockchain/p2p_blockchain/node.rs b/src/blockchain/p2p_blockchain/node.rs index 65e8bc85..791bca09 100644 --- a/src/blockchain/p2p_blockchain/node.rs +++ b/src/blockchain/p2p_blockchain/node.rs @@ -4,6 +4,7 @@ use super::{ address_man::{AddressMan, LocalAddress}, mempool::Mempool, + node_context::{IBDNode, NodeContext, RunningNode}, node_interface::{NodeInterface, NodeResponse, PeerInfo, UserRequest}, peer::{Peer, PeerMessages, Version}, }; @@ -38,34 +39,11 @@ use std::{ time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -/// Max number of simultaneous connections we initiates we are willing to hold -const MAX_OUTGOING_PEERS: usize = 10; -/// We ask for peers every ASK_FOR_PEERS_INTERVAL seconds -const ASK_FOR_PEERS_INTERVAL: u64 = 60; // One minute -/// Save our database of peers every PEER_DB_DUMP_INTERVAL seconds -const PEER_DB_DUMP_INTERVAL: u64 = 60 * 5; // 5 minutes -/// Attempt to open a new connection (if needed) every TRY_NEW_CONNECTION seconds -const TRY_NEW_CONNECTION: u64 = 30; // 30 seconds -/// If ASSUME_STALE seconds passed since our last tip update, treat it as stale -const ASSUME_STALE: u64 = 30 * 60; // 30 minutes -/// While on IBD, if we've been without blocks for this long, ask for headers again -const IBD_REQUEST_BLOCKS_AGAIN: u64 = 10; // 10 seconds -/// How often we broadcast transactions -const BROADCAST_DELAY: u64 = 30; // 30 seconds -/// Wait up to this many seconds for a peer to respond to a request -const PEER_REQUEST_TIMEOUT: u64 = 30 * 60; // 30 minutes FIXME: This is too long -/// Max number of simultaneous inflight requests we allow -const MAX_INFLIGHT_REQUESTS: usize = 1_000; -/// Interval at which we open new feeler connections -const FEELER_INTERVAL: u64 = 60 * 5; // 5 minutes -/// Interval at which we rearrange our addresses -const ADDRESS_REARRANGE_INTERVAL: u64 = 60 * 60; // 1 hour -/// How long we ban a peer for -const BAN_TIME: u64 = 60 * 60 * 24; #[derive(Debug)] pub enum NodeNotification { FromPeer(u32, PeerMessages), } + #[derive(Debug, Clone, PartialEq, Hash)] /// Sent from node to peers, usually to request something pub enum NodeRequest { @@ -96,7 +74,6 @@ enum InflightRequests { Headers, Blocks(BlockHash), RescanBlock(BlockHash), - Addresses, UserRequest(UserRequest), Connect(u32), } @@ -115,7 +92,7 @@ pub struct LocalPeerView { } #[derive(Debug, PartialEq, Clone, Copy)] -enum RescanStatus { +pub enum RescanStatus { InProgress(u32), Completed(Instant), None, @@ -147,14 +124,18 @@ pub struct NodeCommon { /// It also holds the channels to communicate with peers and the block downloader. /// The node is the central task that runs and handles important events, such as new blocks, /// peer connection/disconnection, new addresses, etc. -pub struct UtreexoNode(NodeCommon, Context); -impl Deref for UtreexoNode { +pub struct UtreexoNode(NodeCommon, Context); + +// To reduce keystrokes while using node, assuming that we'll predominantly use the inner +// NodeCommon, we implement deref and deref mut as a passthrough of self.0, so self.something +// is an alias to self.0.something +impl Deref for UtreexoNode { fn deref(&self) -> &Self::Target { &self.0 } type Target = NodeCommon; } -impl DerefMut for UtreexoNode { +impl DerefMut for UtreexoNode { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } @@ -165,18 +146,6 @@ enum PeerStatus { Ready, ShutingDown, } -#[derive(Debug, Default, Clone)] -pub struct IBDNode { - blocks: HashMap, -} - -#[derive(Debug, Clone)] -pub struct RunningNode { - last_rescan_request: RescanStatus, - last_feeler: Instant, - last_address_rearrange: Instant, - user_requests: Arc, -} impl Default for RunningNode { fn default() -> Self { @@ -191,7 +160,7 @@ impl Default for RunningNode { } } -impl UtreexoNode { +impl UtreexoNode { pub fn new( chain: Arc>, mempool: Arc>, @@ -249,7 +218,8 @@ impl UtreexoNode { ); Ok(()) } - fn get_peer_info(&self, peer: &LocalPeerView) -> Option { + fn get_peer_info(&self, peer: &u32) -> Option { + let peer = self.peers.get(peer)?; Some(PeerInfo { address: format!("{}:{}", peer.address, peer.port), services: peer.services.to_string(), @@ -290,6 +260,7 @@ impl UtreexoNode { .has(ServiceFlags::NETWORK | ServiceFlags::WITNESS) { self.send_to_peer(peer, NodeRequest::Shutdown).await?; + self.peers.remove(&peer); return Ok(()); } peer_data.state = PeerStatus::Ready; @@ -383,7 +354,7 @@ impl UtreexoNode { let mut timed_out = vec![]; for request in self.inflight.keys() { let (_, time) = self.inflight.get(request).unwrap(); - if time.elapsed() > Duration::from_secs(PEER_REQUEST_TIMEOUT) { + if time.elapsed() > Duration::from_secs(T::REQUEST_TIMEOUT) { timed_out.push(request.clone()); } } @@ -397,19 +368,13 @@ impl UtreexoNode { InflightRequests::Blocks(block) | InflightRequests::RescanBlock(block) => { to_request.push(block) } - InflightRequests::Addresses => { - let locator = self.chain.get_block_locator()?; - self.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) - .await?; - self.last_get_address_request = Instant::now(); - } InflightRequests::Headers => { self.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) .await?; self.last_headers_request = Instant::now(); } InflightRequests::UserRequest(_) => {} - InflightRequests::Connect(_) => { + InflightRequests::Connect(peer) => { self.send_to_peer(peer, NodeRequest::Shutdown).await? } } @@ -438,6 +403,7 @@ impl UtreexoNode { } let idx = if required_services.has(ServiceFlags::NODE_UTREEXO) { if self.utreexo_peers.is_empty() { + self.state = NodeState::WaitingPeer; return Err(BlockchainError::NoPeersAvailable); } let idx = rand::random::() % self.utreexo_peers.len(); @@ -447,6 +413,7 @@ impl UtreexoNode { .expect("node is in the interval 0..utreexo_peers.len(), but is not here?") } else { if self.peer_ids.is_empty() { + self.state = NodeState::WaitingPeer; return Err(BlockchainError::NoPeersAvailable); } let idx = rand::random::() % self.peer_ids.len(); @@ -517,12 +484,9 @@ impl UtreexoNode { Ok(()) } pub async fn ask_for_addresses(&mut self) -> Result<(), BlockchainError> { - let peer = self + let _ = self .send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) .await?; - self.inflight - .insert(InflightRequests::Addresses, (peer, Instant::now())); - Ok(()) } fn save_peers(&self) -> Result<(), BlockchainError> { @@ -545,7 +509,7 @@ impl UtreexoNode { } async fn maybe_open_connection(&mut self) -> Result<(), BlockchainError> { - if self.peers.len() < MAX_OUTGOING_PEERS { + if self.peers.len() < T::MAX_OUTGOING_PEERS { self.create_connection(false).await; } Ok(()) @@ -723,7 +687,8 @@ impl UtreexoNode { periodic_job!( self.maybe_open_connection().await, self.last_connection, - TRY_NEW_CONNECTION + TRY_NEW_CONNECTION, + IBDNode ); self.last_tip_update = Instant::now(); @@ -752,7 +717,8 @@ impl UtreexoNode { } // If we are downloading blocks, we need to request more if we have space. let currently_inflight = self.0.inflight.len() + self.1.blocks.len(); - if self.state == NodeState::DownloadBlocks && currently_inflight < MAX_INFLIGHT_REQUESTS + if self.state == NodeState::DownloadBlocks + && currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS { let blocks = self.get_blocks_to_download()?; if blocks.is_empty() { @@ -769,7 +735,8 @@ impl UtreexoNode { periodic_job!( self.maybe_request_headers().await, self.last_headers_request, - IBD_REQUEST_BLOCKS_AGAIN + IBD_REQUEST_BLOCKS_AGAIN, + IBDNode ); } Ok(()) @@ -808,7 +775,7 @@ impl UtreexoNode { if let Some(peer) = self.peers.get(&peer).cloned() { self.address_man.update_set_state( peer.address_id as usize, - AddressState::Banned(BAN_TIME), + AddressState::Banned(IBDNode::BAN_TIME), ); } error!( @@ -830,7 +797,7 @@ impl UtreexoNode { let currently_inflight = self.inflight.len() + self.1.blocks.len(); if self.state == NodeState::DownloadBlocks - && currently_inflight < MAX_INFLIGHT_REQUESTS + && currently_inflight < IBDNode::MAX_INFLIGHT_REQUESTS { let blocks = self.get_blocks_to_download()?; self.request_blocks(blocks).await?; @@ -918,7 +885,7 @@ impl UtreexoNode { } fn handle_get_peer_info(&self) { let mut peers = vec![]; - for peer in self.peers.values() { + for peer in self.peer_ids.iter() { peers.push(self.get_peer_info(peer)); } let peers = peers.into_iter().flatten().collect(); @@ -979,19 +946,22 @@ impl UtreexoNode { periodic_job!( self.save_peers(), self.last_peer_db_dump, - PEER_DB_DUMP_INTERVAL + PEER_DB_DUMP_INTERVAL, + RunningNode ); // Rework our address database periodic_job!( self.address_man.rearrange_buckets(), self.1.last_address_rearrange, - ADDRESS_REARRANGE_INTERVAL + ADDRESS_REARRANGE_INTERVAL, + RunningNode ); // Perhaps we need more connections periodic_job!( self.maybe_open_connection().await, self.last_connection, - TRY_NEW_CONNECTION + TRY_NEW_CONNECTION, + RunningNode ); try_and_log!(self.check_request_timeout()); self.handle_user_request().await; @@ -1004,25 +974,29 @@ impl UtreexoNode { periodic_job!( self.ask_for_addresses().await, self.last_get_address_request, - ASK_FOR_PEERS_INTERVAL + ASK_FOR_PEERS_INTERVAL, + RunningNode ); // Check whether we are in a stale tip periodic_job!( self.check_for_stale_tip().await, self.last_tip_update, - ASSUME_STALE + ASSUME_STALE, + RunningNode ); // Open new feeler connection periodically periodic_job!( self.open_feeler_connection().await, self.1.last_feeler, - FEELER_INTERVAL + FEELER_INTERVAL, + RunningNode ); // Try broadcast transactions periodic_job!( self.handle_broadcast().await, self.last_broadcast, - BROADCAST_DELAY + BROADCAST_DELAY, + RunningNode ); try_and_log!(self.ask_block().await); try_and_log!(self.check_for_timeout().await); @@ -1031,7 +1005,7 @@ impl UtreexoNode { } async fn request_rescan_block(&mut self) -> Result<(), BlockchainError> { let tip = self.chain.get_height().unwrap(); - if self.inflight.len() + 10 > MAX_INFLIGHT_REQUESTS { + if self.inflight.len() + 10 > RunningNode::MAX_INFLIGHT_REQUESTS { return Ok(()); } // We use a grace period to avoid looping at the end of rescan @@ -1134,8 +1108,10 @@ impl UtreexoNode { { // We didn't request this block, so we should disconnect the peer. if let Some(peer) = self.peers.get(&peer).cloned() { - self.address_man - .update_set_state(peer.address_id as usize, AddressState::Banned(BAN_TIME)); + self.address_man.update_set_state( + peer.address_id as usize, + AddressState::Banned(RunningNode::BAN_TIME), + ); } error!( "Peer {peer} sent us block {} which we didn't request", @@ -1166,8 +1142,10 @@ impl UtreexoNode { ); // Disconnect the peer and ban it. if let Some(peer) = self.peers.get(&peer).cloned() { - self.address_man - .update_set_state(peer.address_id as usize, AddressState::Banned(BAN_TIME)); + self.address_man.update_set_state( + peer.address_id as usize, + AddressState::Banned(RunningNode::BAN_TIME), + ); } self.send_to_peer(peer, NodeRequest::Shutdown).await?; @@ -1261,8 +1239,8 @@ macro_rules! try_and_log { }; } macro_rules! periodic_job { - ($what: expr, $timer: expr, $interval: ident) => { - if $timer.elapsed() > Duration::from_secs($interval) { + ($what: expr, $timer: expr, $interval: ident, $context: ty) => { + if $timer.elapsed() > Duration::from_secs(<$context>::$interval) { try_and_log!($what); $timer = Instant::now(); } diff --git a/src/blockchain/p2p_blockchain/node_context.rs b/src/blockchain/p2p_blockchain/node_context.rs new file mode 100644 index 00000000..7db13669 --- /dev/null +++ b/src/blockchain/p2p_blockchain/node_context.rs @@ -0,0 +1,50 @@ +use std::{collections::HashMap, sync::Arc, time::Instant}; + +use bitcoin::{network::utreexo::UtreexoBlock, BlockHash}; + +use super::{node::RescanStatus, node_interface::NodeInterface}; + +pub trait NodeContext { + const REQUEST_TIMEOUT: u64; + /// Max number of simultaneous connections we initiates we are willing to hold + const MAX_OUTGOING_PEERS: usize = 10; + /// We ask for peers every ASK_FOR_PEERS_INTERVAL seconds + const ASK_FOR_PEERS_INTERVAL: u64 = 60 * 60; // One hour + /// Save our database of peers every PEER_DB_DUMP_INTERVAL seconds + const PEER_DB_DUMP_INTERVAL: u64 = 60 * 5; // 5 minutes + /// Attempt to open a new connection (if needed) every TRY_NEW_CONNECTION seconds + const TRY_NEW_CONNECTION: u64 = 30; // 30 seconds + /// If ASSUME_STALE seconds passed since our last tip update, treat it as stale + const ASSUME_STALE: u64 = 30 * 60; // 30 minutes + /// While on IBD, if we've been without blocks for this long, ask for headers again + const IBD_REQUEST_BLOCKS_AGAIN: u64 = 10; // 10 seconds + /// How often we broadcast transactions + const BROADCAST_DELAY: u64 = 30; // 30 seconds + /// Max number of simultaneous inflight requests we allow + const MAX_INFLIGHT_REQUESTS: usize = 1_000; + /// Interval at which we open new feeler connections + const FEELER_INTERVAL: u64 = 60 * 5; // 5 minutes + /// Interval at which we rearrange our addresses + const ADDRESS_REARRANGE_INTERVAL: u64 = 60 * 60; // 1 hour + /// How long we ban a peer for + const BAN_TIME: u64 = 60 * 60 * 24; +} + +#[derive(Debug, Clone)] +pub struct RunningNode { + pub last_rescan_request: RescanStatus, + pub last_feeler: Instant, + pub last_address_rearrange: Instant, + pub user_requests: Arc, +} +impl NodeContext for RunningNode { + const REQUEST_TIMEOUT: u64 = 30; +} + +#[derive(Debug, Default, Clone)] +pub struct IBDNode { + pub blocks: HashMap, +} +impl NodeContext for IBDNode { + const REQUEST_TIMEOUT: u64 = 30 * 60; +}