From 4b01117d3eb1f173484112194c48f2b93a749128 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 12:47:55 +0800 Subject: [PATCH 01/10] Fix peer set inconsistency in different components --- crates/subcoin-network/src/peer_manager.rs | 46 ++++++++++++++++------ crates/subcoin-network/src/sync.rs | 3 ++ crates/subcoin-network/src/worker.rs | 7 +++- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/crates/subcoin-network/src/peer_manager.rs b/crates/subcoin-network/src/peer_manager.rs index c9f272a..4e81ffe 100644 --- a/crates/subcoin-network/src/peer_manager.rs +++ b/crates/subcoin-network/src/peer_manager.rs @@ -315,7 +315,7 @@ where } } - pub(crate) fn on_tick(&mut self) -> Option { + pub(crate) fn on_tick(&mut self) -> (Vec, Option) { let mut timeout_peers = vec![]; let mut should_ping_peers = vec![]; @@ -334,10 +334,6 @@ where self.send_pings(should_ping_peers); } - for peer_id in timeout_peers { - self.disconnect(peer_id, Error::PingTimeout); - } - let outbound_peers_count = self .connected_peers .values() @@ -360,6 +356,13 @@ where .set(self.address_book.available_addresses_count() as u64); } + let maybe_slow_peer = self.manage_outbound_connections(outbound_peers_count); + + (timeout_peers, maybe_slow_peer) + } + + /// Manages outbound connections by initiating new connections or evicting slow peers. + fn manage_outbound_connections(&mut self, outbound_peers_count: usize) -> Option { if outbound_peers_count < self.max_outbound_peers { if let Some(addr) = self.address_book.pop() { if !self.connections.contains_key(&addr) { @@ -442,7 +445,19 @@ where } } - /// Disconnect from a peer with given reason, do nothing if the peer is persistent. + /// Disconnects from a specified peer, unless it is designated as persistent, with a given reason. + /// + /// # Important Notes + /// + /// - **Syncing Components:** This function, as well as [`Self::evict`], should not be invoked + /// directly within the peer manager module without triggering a notification. For example, + /// chain sync might depend on receiving a disconnect notification to correctly update their + /// internal state, which helps maintain a consistent peer set between the peer manager and + /// other modules. + /// + /// - **Potential for Inconsistent State:** Bypassing notifications may lead to inconsistency + /// between the peer manager and modules that rely on peer status, resulting in unexpected + /// issues in the peer set or other connected components. pub(crate) fn disconnect(&mut self, peer_id: PeerId, reason: Error) { if self.config.persistent.contains(&peer_id) { return; @@ -465,6 +480,20 @@ where self.connected_peers.remove(&peer_id); } + /// Evicts a peer, disconnecting it with a specified reason and updating the eviction timestamp. + /// + /// This function internally calls [`Self::disconnect`] to carry out the disconnection + /// process and subsequently records the current time as the `last_eviction` timestamp. + /// + /// # Important Note + /// + /// Just like with `disconnect`, any call to `evict` should be accompanied by necessary + /// notifications to avoid state inconsistencies. + pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) { + self.disconnect(peer_id, reason); + self.last_eviction = Instant::now(); + } + /// Sets the prefer addrv2 flag for a peer. pub(crate) fn set_want_addrv2(&mut self, peer_id: PeerId) { self.connected_peers.entry(peer_id).and_modify(|info| { @@ -479,11 +508,6 @@ where }); } - pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) { - self.disconnect(peer_id, reason); - self.last_eviction = Instant::now(); - } - /// Checks if a peer is connected. pub(crate) fn is_connected(&self, peer_id: PeerId) -> bool { self.connected_peers.contains_key(&peer_id) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 3e04466..53d7f09 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -414,6 +414,9 @@ where self.peers.entry(current_sync_peer_id).and_modify(|peer| { peer.state = PeerSyncState::Available; }); + self.peers.entry(peer_id).and_modify(|peer| { + peer.state = PeerSyncState::DownloadingNew { start: our_best }; + }); } } } diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index 20c43d0..949dddf 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -221,7 +221,12 @@ where self.peer_store.remove_peer(peer); } - if let Some(SlowPeer { peer_id, latency }) = self.peer_manager.on_tick() { + let (timeout_peers, maybe_slow_peer) = self.peer_manager.on_tick(); + timeout_peers.into_iter().for_each(|peer_id| { + self.peer_manager.disconnect(peer_id, Error::PingTimeout); + self.chain_sync.remove_peer(peer_id); + }); + if let Some(SlowPeer { peer_id, latency }) = maybe_slow_peer { self.peer_manager.evict(peer_id, Error::SlowPeer(latency)); self.chain_sync.remove_peer(peer_id); } From 382a148957c7fe717d422a04dbae236bc41456e4 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 17:35:38 +0800 Subject: [PATCH 02/10] Refactor `restart_sync()` --- crates/subcoin-network/src/sync.rs | 102 ++++++++++++++++------------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 53d7f09..7e6174a 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -229,67 +229,77 @@ where .collect() } - /// Attempts to restart the sync due to the stalled peer. - /// - /// Returns `true` if the sync is restarted with a new peer. - pub(super) fn restart_sync(&mut self, stalled_peer: PeerId) -> bool { - let our_best = self.client.best_number(); - - // First, try to find the best available peer for syncing. - let new_available_peer = self - .peers - .values_mut() + /// Attempt to find the best available peer, falling back to a random choice if needed + fn select_next_peer_for_sync( + &mut self, + our_best: u32, + excluded_peer: PeerId, + ) -> Option { + self.peers + .values() .filter(|peer| { - peer.peer_id != stalled_peer + peer.peer_id != excluded_peer && peer.best_number > our_best && peer.state.is_available() }) - .max_by_key(|peer| peer.best_number); - - let new_peer = match new_available_peer { - Some(peer) => peer, - None => { + .max_by_key(|peer| peer.best_number) + .map(|peer| peer.peer_id) + .or_else(|| { let sync_candidates = self .peers - .values_mut() - .filter(|peer| peer.peer_id != stalled_peer && peer.best_number > our_best) + .values() + .filter(|peer| peer.peer_id != excluded_peer && peer.best_number > our_best) .collect::>(); - if sync_candidates.is_empty() { - if let Some(median_seen_block) = self.median_seen() { - let best_seen_block = self.peers.values().map(|p| p.best_number).max(); - - if median_seen_block <= our_best { - // We are synced to the median block seen by our peers, but this may - // not be the network's tip. - // - // Transition to idle unless more blocks are announced. - tracing::debug!( - best_seen_block, - median_seen_block, - our_best, - "Synced to the majority of peers, no new blocks to sync" - ); - self.syncing = Syncing::Idle; - return false; - } - } + // Pick a random peer, even if it's marked as deprioritized. + self.rng.choice(sync_candidates).map(|peer| peer.peer_id) + }) + } - // No new sync candidate, keep it as is. - // TODO: handle this properly. - tracing::debug!(?stalled_peer, "⚠️ Sync stalled, but no new sync candidates"); + /// Attempts to restart the sync due to a dropped peer. + /// + /// Returns `true` if the sync is restarted with a new peer. + pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId) -> bool { + let our_best = self.client.best_number(); + let maybe_next_sync_peer = self.select_next_peer_for_sync(our_best, prior_peer_id); + + let Some(new_peer_id) = maybe_next_sync_peer else { + if let Some(median_seen_block) = self.median_seen() { + if median_seen_block <= our_best { + let best_seen_block = self.peers.values().map(|p| p.best_number).max(); + + // We are synced to the median block seen by our peers, but this may + // not be the network's tip. + // + // Transition to idle unless more blocks are announced. + tracing::debug!( + best_seen_block, + median_seen_block, + our_best, + "Synced to the majority of peers, switching to Idle" + ); + self.syncing = Syncing::Idle; return false; } - - // Pick a random peer, even if it's marked as deprioritized. - self.rng - .choice(sync_candidates) - .expect("Sync candidates must be non-empty as checked; qed") } + + // No new sync candidate, keep it as is. + // TODO: handle this properly. + tracing::debug!( + ?prior_peer_id, + "⚠️ Attempting to restart sync, but no new sync candidate available" + ); + + return false; + }; + + let Some(new_peer) = self.peers.get_mut(&new_peer_id) else { + tracing::error!("Next peer {new_peer_id} missing from peer list"); + return false; }; - tracing::debug!(?stalled_peer, ?new_peer, "🔄 Sync stalled, restarting"); + tracing::debug!(?prior_peer_id, ?new_peer, "🔄 Sync restarted"); new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; From 7b87681acb780b993bab3aad18d489e2075c4645 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 17:49:32 +0800 Subject: [PATCH 03/10] Inline `note_stalled_peer()` --- crates/subcoin-network/src/sync.rs | 53 +++++++++++++--------------- crates/subcoin-network/src/worker.rs | 4 +-- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 7e6174a..aca0bb7 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -259,7 +259,7 @@ where /// Attempts to restart the sync due to a dropped peer. /// /// Returns `true` if the sync is restarted with a new peer. - pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId) -> bool { + pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId) { let our_best = self.client.best_number(); let maybe_next_sync_peer = self.select_next_peer_for_sync(our_best, prior_peer_id); @@ -280,7 +280,7 @@ where "Synced to the majority of peers, switching to Idle" ); self.syncing = Syncing::Idle; - return false; + return; } } @@ -291,29 +291,35 @@ where "⚠️ Attempting to restart sync, but no new sync candidate available" ); - return false; - }; - - let Some(new_peer) = self.peers.get_mut(&new_peer_id) else { - tracing::error!("Next peer {new_peer_id} missing from peer list"); - return false; + return; }; - tracing::debug!(?prior_peer_id, ?new_peer, "🔄 Sync restarted"); + { + let Some(new_peer) = self.peers.get_mut(&new_peer_id) else { + tracing::error!("Next peer {new_peer_id} missing from peer list"); + return; + }; - new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; + tracing::debug!(?prior_peer_id, ?new_peer, "🔄 Sync restarted"); + new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; - match &mut self.syncing { - Syncing::BlocksFirst(downloader) => { - downloader.restart(new_peer.peer_id, new_peer.best_number); - true - } - Syncing::HeadersFirst(downloader) => { - downloader.restart(new_peer.peer_id, new_peer.best_number); - true + match &mut self.syncing { + Syncing::BlocksFirst(downloader) => { + downloader.restart(new_peer.peer_id, new_peer.best_number); + } + Syncing::HeadersFirst(downloader) => { + downloader.restart(new_peer.peer_id, new_peer.best_number); + } + Syncing::Idle => {} } - Syncing::Idle => false, } + + self.peers.entry(prior_peer_id).and_modify(|p| { + let current_stalled_count = p.state.stalled_count(); + p.state = PeerSyncState::Deprioritized { + stalled_count: current_stalled_count + 1, + }; + }); } /// Returns the median block number advertised by our peers. @@ -333,15 +339,6 @@ where } } - pub(super) fn note_peer_stalled(&mut self, stalled_peer: PeerId) { - self.peers.entry(stalled_peer).and_modify(|p| { - let current_stalled_count = p.state.stalled_count(); - p.state = PeerSyncState::Deprioritized { - stalled_count: current_stalled_count + 1, - }; - }); - } - pub(super) fn remove_peer(&mut self, peer_id: PeerId) { // TODO: handle the situation that the peer is being involved in the downloader. self.peers.remove(&peer_id); diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index 949dddf..e24c4a9 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -459,9 +459,7 @@ where self.chain_sync.switch_to_idle(); } SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => { - if self.chain_sync.restart_sync(stalled_peer_id) { - self.chain_sync.note_peer_stalled(stalled_peer_id); - } + self.chain_sync.restart_sync(stalled_peer_id); } SyncAction::Disconnect(peer_id, reason) => { self.peer_manager.disconnect(peer_id, reason); From 672c721de99c4338597a9706be01959226ee8994 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 18:05:43 +0800 Subject: [PATCH 04/10] Restart sync on removing current syncing peer in chain sync --- crates/subcoin-network/src/sync.rs | 47 ++++++++++++++++++---------- crates/subcoin-network/src/worker.rs | 5 +-- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index aca0bb7..00c3bc4 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -119,6 +119,11 @@ pub(crate) enum SyncAction { None, } +pub(crate) enum RestartReason { + Stalled, + Disconnected, +} + // This enum encapsulates the various strategies and states a node // might be in during the sync process. enum Syncing { @@ -229,6 +234,14 @@ where .collect() } + pub(super) fn remove_peer(&mut self, peer_id: PeerId) { + if let Some(removed_peer) = self.peers.remove(&peer_id) { + if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) { + self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected); + } + } + } + /// Attempt to find the best available peer, falling back to a random choice if needed fn select_next_peer_for_sync( &mut self, @@ -256,15 +269,13 @@ where }) } - /// Attempts to restart the sync due to a dropped peer. + /// Attempts to restart the sync based on the reason provided. /// /// Returns `true` if the sync is restarted with a new peer. - pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId) { + pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) { let our_best = self.client.best_number(); - let maybe_next_sync_peer = self.select_next_peer_for_sync(our_best, prior_peer_id); - - let Some(new_peer_id) = maybe_next_sync_peer else { + let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else { if let Some(median_seen_block) = self.median_seen() { if median_seen_block <= our_best { let best_seen_block = self.peers.values().map(|p| p.best_number).max(); @@ -296,7 +307,7 @@ where { let Some(new_peer) = self.peers.get_mut(&new_peer_id) else { - tracing::error!("Next peer {new_peer_id} missing from peer list"); + tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list"); return; }; @@ -314,12 +325,19 @@ where } } - self.peers.entry(prior_peer_id).and_modify(|p| { - let current_stalled_count = p.state.stalled_count(); - p.state = PeerSyncState::Deprioritized { - stalled_count: current_stalled_count + 1, - }; - }); + match reason { + RestartReason::Stalled => { + self.peers.entry(prior_peer_id).and_modify(|p| { + let current_stalled_count = p.state.stalled_count(); + p.state = PeerSyncState::Deprioritized { + stalled_count: current_stalled_count + 1, + }; + }); + } + RestartReason::Disconnected => { + // Nothing to be done, peer is already removed from the peer list. + } + } } /// Returns the median block number advertised by our peers. @@ -339,11 +357,6 @@ where } } - pub(super) fn remove_peer(&mut self, peer_id: PeerId) { - // TODO: handle the situation that the peer is being involved in the downloader. - self.peers.remove(&peer_id); - } - pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) { self.peers.entry(peer_id).and_modify(|peer| { peer.latency = avg_latency; diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index e24c4a9..d2e2231 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -5,7 +5,7 @@ use crate::network::{ }; use crate::peer_manager::{Config, NewPeer, PeerManager, SlowPeer, PEER_LATENCY_THRESHOLD}; use crate::peer_store::PeerStore; -use crate::sync::{ChainSync, LocatorRequest, SyncAction, SyncRequest}; +use crate::sync::{ChainSync, LocatorRequest, RestartReason, SyncAction, SyncRequest}; use crate::transaction_manager::TransactionManager; use crate::{Bandwidth, Error, PeerId, SyncStrategy}; use bitcoin::p2p::message::{NetworkMessage, MAX_INV_SIZE}; @@ -459,7 +459,8 @@ where self.chain_sync.switch_to_idle(); } SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => { - self.chain_sync.restart_sync(stalled_peer_id); + self.chain_sync + .restart_sync(stalled_peer_id, RestartReason::Stalled); } SyncAction::Disconnect(peer_id, reason) => { self.peer_manager.disconnect(peer_id, reason); From 0fe7dc7c60c375a35be6bd8b0617e808ea3c3fb7 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 19:25:11 +0800 Subject: [PATCH 05/10] Fix overflow --- crates/subcoin-network/src/block_downloader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/subcoin-network/src/block_downloader.rs b/crates/subcoin-network/src/block_downloader.rs index be19b56..4087075 100644 --- a/crates/subcoin-network/src/block_downloader.rs +++ b/crates/subcoin-network/src/block_downloader.rs @@ -184,7 +184,7 @@ impl BlockDownloadManager { _ => 512, }; - let queued_blocks = self.best_queued_number - best_number; + let queued_blocks = self.best_queued_number.saturating_sub(best_number); if queued_blocks > max_queued_blocks { self.queue_status = ImportQueueStatus::Overloaded; From 05f778ae42102bb97ad5b43bd4e401ecd3d84f3f Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Fri, 25 Oct 2024 19:45:35 +0800 Subject: [PATCH 06/10] Rename remove_peer() to disconnect() --- crates/subcoin-network/src/sync.rs | 8 ++++++-- crates/subcoin-network/src/worker.rs | 14 +++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 00c3bc4..f9bd47f 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -119,6 +119,7 @@ pub(crate) enum SyncAction { None, } +#[derive(Debug)] pub(crate) enum RestartReason { Stalled, Disconnected, @@ -234,8 +235,11 @@ where .collect() } - pub(super) fn remove_peer(&mut self, peer_id: PeerId) { + /// Removes the given peer from peers of chain sync. + pub(super) fn disconnect(&mut self, peer_id: PeerId) { if let Some(removed_peer) = self.peers.remove(&peer_id) { + // We currently support only one syncing peer, this logic needs to be + // refactored once multiple syncing peers are supported. if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) { self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected); } @@ -311,7 +315,7 @@ where return; }; - tracing::debug!(?prior_peer_id, ?new_peer, "🔄 Sync restarted"); + tracing::debug!(?reason, ?prior_peer_id, ?new_peer, "🔄 Sync restarted"); new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; match &mut self.syncing { diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index d2e2231..dbcb4b4 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -182,7 +182,7 @@ where } Event::Disconnect { peer_addr, reason } => { self.peer_manager.disconnect(peer_addr, reason); - self.chain_sync.remove_peer(peer_addr); + self.chain_sync.disconnect(peer_addr); self.peer_store.remove_peer(peer_addr); } Event::PeerMessage { @@ -217,18 +217,18 @@ where for peer in self.chain_sync.unreliable_peers() { self.peer_manager.disconnect(peer, Error::UnreliablePeer); - self.chain_sync.remove_peer(peer); + self.chain_sync.disconnect(peer); self.peer_store.remove_peer(peer); } let (timeout_peers, maybe_slow_peer) = self.peer_manager.on_tick(); timeout_peers.into_iter().for_each(|peer_id| { self.peer_manager.disconnect(peer_id, Error::PingTimeout); - self.chain_sync.remove_peer(peer_id); + self.chain_sync.disconnect(peer_id); }); if let Some(SlowPeer { peer_id, latency }) = maybe_slow_peer { self.peer_manager.evict(peer_id, Error::SlowPeer(latency)); - self.chain_sync.remove_peer(peer_id); + self.chain_sync.disconnect(peer_id); } let connected_peers = self.peer_manager.connected_peers(); @@ -352,7 +352,7 @@ where if avg_latency > PEER_LATENCY_THRESHOLD { self.peer_manager .disconnect(from, Error::PingLatencyTooHigh(avg_latency)); - self.chain_sync.remove_peer(from); + self.chain_sync.disconnect(from); self.peer_store.remove_peer(from); } else { if self.chain_sync.peers.contains_key(&from) { @@ -372,7 +372,7 @@ where } Err(err) => { self.peer_manager.disconnect(from, err); - self.chain_sync.remove_peer(from); + self.chain_sync.disconnect(from); self.peer_store.remove_peer(from); } } @@ -464,7 +464,7 @@ where } SyncAction::Disconnect(peer_id, reason) => { self.peer_manager.disconnect(peer_id, reason); - self.chain_sync.remove_peer(peer_id); + self.chain_sync.disconnect(peer_id); } SyncAction::None => {} } From 9e17b5204680d7841d158bf72220f197e9468df0 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 26 Oct 2024 13:30:52 +0800 Subject: [PATCH 07/10] Rename to SyncPeers --- crates/subcoin-rpc/src/network.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/crates/subcoin-rpc/src/network.rs b/crates/subcoin-rpc/src/network.rs index 3f80a63..a16d2f2 100644 --- a/crates/subcoin-rpc/src/network.rs +++ b/crates/subcoin-rpc/src/network.rs @@ -17,11 +17,12 @@ pub enum SyncState { DownloadingNew, } +/// Overview of peers in chain sync. #[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct NetworkPeers { +pub struct SyncPeers { /// A map containing the count of peers in each sync state (e.g., syncing, idle). - sync_state_counts: BTreeMap, + peer_counts: BTreeMap, /// The highest block height known across all peers in the network. best_known_block: Option, /// Detailed synchronization information for each peer. @@ -31,8 +32,8 @@ pub struct NetworkPeers { #[rpc(client, server)] pub trait NetworkApi { /// Get the sync peers. - #[method(name = "network_peers")] - async fn network_peers(&self) -> Result; + #[method(name = "network_syncPeers")] + async fn network_sync_peers(&self) -> Result; /// Get overall network status. #[method(name = "network_status")] @@ -68,7 +69,7 @@ where Block: BlockT + 'static, Client: HeaderBackend + BlockBackend + AuxStore + 'static, { - async fn network_peers(&self) -> Result { + async fn network_sync_peers(&self) -> Result { let mut sync_peers = self.network_handle.sync_peers().await; let mut available = 0; @@ -91,17 +92,13 @@ where sync_peers.sort_by_key(|x| x.latency); - Ok(NetworkPeers { - sync_state_counts: BTreeMap::from([ + Ok(SyncPeers { + peer_counts: BTreeMap::from([ (SyncState::Available, available), (SyncState::Deprioritized, deprioritized), (SyncState::DownloadingNew, downloading_new), ]), - best_known_block: if best_known_block > 0 { - Some(best_known_block) - } else { - None - }, + best_known_block: (best_known_block > 0).then_some(best_known_block), peer_sync_details: sync_peers, }) } From fea2669fd82c3090c9d93f462be12fb9452b97cf Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 26 Oct 2024 16:44:52 +0800 Subject: [PATCH 08/10] Rename SetIdle --- .../src/block_downloader/blocks_first.rs | 4 ++-- crates/subcoin-network/src/sync.rs | 14 +++++++------- crates/subcoin-network/src/worker.rs | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/subcoin-network/src/block_downloader/blocks_first.rs b/crates/subcoin-network/src/block_downloader/blocks_first.rs index 6e5d9dd..eea70f7 100644 --- a/crates/subcoin-network/src/block_downloader/blocks_first.rs +++ b/crates/subcoin-network/src/block_downloader/blocks_first.rs @@ -144,7 +144,7 @@ where if best_number == self.target_block_number { self.state = State::Completed; - return SyncAction::SwitchToIdle; + return SyncAction::SetIdle; } if self.download_manager.is_stalled(self.peer_id) { @@ -299,7 +299,7 @@ where "Received block #{block_number},{block_hash} higher than the target block" ); self.state = State::Completed; - SyncAction::SwitchToIdle + SyncAction::SetIdle } else { self.state = State::Disconnecting; SyncAction::Disconnect( diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index f9bd47f..76735f6 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -105,16 +105,16 @@ pub(crate) enum SyncRequest { pub(crate) enum SyncAction { /// Fetch headers, blocks and data. Request(SyncRequest), - /// Headers-First sync completed, use the Blocks-First sync - /// to download the recent blocks. + /// Transitions to a Blocks-First sync after Headers-First sync + /// compltes, to fetch the most recent blocks. SwitchToBlocksFirstSync, /// Disconnect from the peer for the given reason. Disconnect(PeerId, Error), - /// Make this peer as deprioritized and restart the current syncing - /// process using other sync candidates if there are any. + /// Deprioritize the specified peer, restarting the current sync + /// with other candidates if available. RestartSyncWithStalledPeer(PeerId), - /// Blocks-First sync finished, switch syncing state to idle. - SwitchToIdle, + /// Blocks-First sync finished and sets the syncing state to idle. + SetIdle, /// No action needed. None, } @@ -583,7 +583,7 @@ where .store(is_major_syncing, Ordering::Relaxed); } - pub(super) fn switch_to_idle(&mut self) { + pub(super) fn set_idle(&mut self) { tracing::debug!( best_number = self.client.best_number(), "Blocks-First sync completed, switching to Syncing::Idle" diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index dbcb4b4..723f3e2 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -455,8 +455,8 @@ where self.send_get_blocks_request(request); } } - SyncAction::SwitchToIdle => { - self.chain_sync.switch_to_idle(); + SyncAction::SetIdle => { + self.chain_sync.set_idle(); } SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => { self.chain_sync From f11e6e50526844fb1173e0109b0995fc19a1db43 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 26 Oct 2024 20:50:52 +0800 Subject: [PATCH 09/10] Nits --- .../src/block_downloader/headers_first.rs | 2 - crates/subcoin-network/src/sync.rs | 47 +++++++++---------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/crates/subcoin-network/src/block_downloader/headers_first.rs b/crates/subcoin-network/src/block_downloader/headers_first.rs index 0bf287d..35c3833 100644 --- a/crates/subcoin-network/src/block_downloader/headers_first.rs +++ b/crates/subcoin-network/src/block_downloader/headers_first.rs @@ -128,8 +128,6 @@ pub struct HeadersFirstDownloader { downloaded_headers: DownloadedHeaders, downloaded_blocks_count: usize, last_locator_start: u32, - // TODO: Now it's solely used for the purpose of displaying the sync state. - // refactor it later. target_block_number: u32, _phantom: PhantomData, } diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 76735f6..79f1163 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -294,7 +294,7 @@ where our_best, "Synced to the majority of peers, switching to Idle" ); - self.syncing = Syncing::Idle; + self.update_syncing_state(Syncing::Idle); return; } } @@ -370,9 +370,9 @@ where pub(super) fn update_sync_peer_on_lower_latency(&mut self) { let maybe_sync_peer_id = match &self.syncing { + Syncing::Idle => return, Syncing::BlocksFirst(downloader) => downloader.replaceable_sync_peer(), Syncing::HeadersFirst(downloader) => downloader.replaceable_sync_peer(), - Syncing::Idle => return, }; let Some(current_sync_peer_id) = maybe_sync_peer_id else { @@ -577,10 +577,9 @@ where } fn update_syncing_state(&mut self, new: Syncing) { - let is_major_syncing = new.is_major_syncing(); self.syncing = new; self.is_major_syncing - .store(is_major_syncing, Ordering::Relaxed); + .store(self.syncing.is_major_syncing(), Ordering::Relaxed); } pub(super) fn set_idle(&mut self) { @@ -632,7 +631,7 @@ where if inventories.len() == 1 { if let Inventory::Block(block_hash) = inventories[0] { if !self.inflight_announced_blocks.contains(&block_hash) { - // A new block maybe broadcasted via `inv` message. + // A new block is broadcasted via `inv` message. tracing::trace!( "Requesting a new block {block_hash} announced from {from:?}" ); @@ -647,22 +646,6 @@ where } } - pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction { - if self.inflight_announced_blocks.remove(&block.block_hash()) { - self.import_queue.import_blocks(ImportBlocks { - origin: BlockOrigin::NetworkBroadcast, - blocks: vec![block], - }); - return SyncAction::None; - } - - match &mut self.syncing { - Syncing::Idle => SyncAction::None, - Syncing::BlocksFirst(downloader) => downloader.on_block(block, from), - Syncing::HeadersFirst(downloader) => downloader.on_block(block, from), - } - } - fn announced_blocks_request( &mut self, block_hashes: impl IntoIterator, @@ -682,6 +665,22 @@ where SyncAction::Request(data_request) } + pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction { + match &mut self.syncing { + Syncing::Idle => { + if self.inflight_announced_blocks.remove(&block.block_hash()) { + self.import_queue.import_blocks(ImportBlocks { + origin: BlockOrigin::NetworkBroadcast, + blocks: vec![block], + }); + } + SyncAction::None + } + Syncing::BlocksFirst(downloader) => downloader.on_block(block, from), + Syncing::HeadersFirst(downloader) => downloader.on_block(block, from), + } + } + pub(super) fn on_headers(&mut self, headers: Vec, from: PeerId) -> SyncAction { match &mut self.syncing { Syncing::HeadersFirst(downloader) => downloader.on_headers(headers, from), @@ -707,10 +706,8 @@ where } } - return self.announced_blocks_request( - headers.into_iter().map(|header| header.block_hash()), - from, - ); + let new_blocks = headers.into_iter().map(|header| header.block_hash()); + return self.announced_blocks_request(new_blocks, from); } tracing::debug!( From b7e0d07b374544d17f6a58f22763ba98605c77d1 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sun, 27 Oct 2024 15:19:15 +0800 Subject: [PATCH 10/10] Nit --- crates/subcoin-network/src/block_downloader/blocks_first.rs | 6 +----- .../subcoin-network/src/block_downloader/headers_first.rs | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/crates/subcoin-network/src/block_downloader/blocks_first.rs b/crates/subcoin-network/src/block_downloader/blocks_first.rs index eea70f7..c3da986 100644 --- a/crates/subcoin-network/src/block_downloader/blocks_first.rs +++ b/crates/subcoin-network/src/block_downloader/blocks_first.rs @@ -101,11 +101,7 @@ where } pub(crate) fn replaceable_sync_peer(&self) -> Option { - if self.downloaded_blocks_count > 0 { - None - } else { - Some(self.peer_id) - } + (self.downloaded_blocks_count == 0).then_some(self.peer_id) } pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) { diff --git a/crates/subcoin-network/src/block_downloader/headers_first.rs b/crates/subcoin-network/src/block_downloader/headers_first.rs index 35c3833..034c2db 100644 --- a/crates/subcoin-network/src/block_downloader/headers_first.rs +++ b/crates/subcoin-network/src/block_downloader/headers_first.rs @@ -178,11 +178,7 @@ where } pub(crate) fn replaceable_sync_peer(&self) -> Option { - if self.downloaded_blocks_count > 0 { - None - } else { - Some(self.peer_id) - } + (self.downloaded_blocks_count == 0).then_some(self.peer_id) } pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {