diff --git a/crates/subcoin-network/src/lib.rs b/crates/subcoin-network/src/lib.rs index e6511af..4db9a43 100644 --- a/crates/subcoin-network/src/lib.rs +++ b/crates/subcoin-network/src/lib.rs @@ -116,6 +116,8 @@ pub enum Error { PingTimeout, #[error("Ping latency exceeds the threshold")] PingLatencyTooHigh, + #[error("Peer is deprioritized and has encountered multiple syncing failures")] + UnreliableSyncPeer, #[error("Peer's latency ({0} ms) is too high")] SlowPeer(Latency), #[error("Unexpected pong message")] diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 1b2604b..8b62bf1 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -26,14 +26,20 @@ const LATENCY_IMPROVEMENT_THRESHOLD: u128 = 4; // Define a constant for the low ping latency cutoff, in milliseconds. const LOW_LATENCY_CUTOFF: Latency = 20; +/// Maximum number of syncing retries for a deprioritized peer. +const MAX_STALLS: usize = 5; + /// The state of syncing between a Peer and ourselves. #[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)] #[serde(rename_all = "camelCase")] pub enum PeerSyncState { /// Available for sync requests. Available, - /// The peer has been discouraged due to syncing from this peer was stalled before. - Discouraged, + /// The peer has been deprioritized due to past syncing issues (e.g., stalling). + Deprioritized { + /// Number of times the peer has stalled. + stalled_count: usize, + }, /// Actively downloading new blocks, starting from the given Number. DownloadingNew { start: u32 }, } @@ -43,6 +49,21 @@ impl PeerSyncState { pub fn is_available(&self) -> bool { matches!(self, Self::Available) } + + fn stalled_count(&self) -> usize { + match self { + Self::Deprioritized { stalled_count } => *stalled_count, + _ => 0, + } + } + + /// Determines if the peer is permanently deprioritized based on the stall count. + fn is_permanently_deprioritized(&self) -> bool { + match self { + PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS, + _ => false, + } + } } /// Contains all the data about a Peer that we are trying to sync with. @@ -89,7 +110,7 @@ pub(crate) enum SyncAction { SwitchToBlocksFirstSync, /// Disconnect from the peer for the given reason. Disconnect(PeerId, Error), - /// Make this peer as discouraged and restart the current syncing + /// Make this peer as deprioritized and restart the current syncing /// process using other sync candidates if there are any. RestartSyncWithStalledPeer(PeerId), /// Blocks-First sync finished, switch syncing state to idle. @@ -196,6 +217,18 @@ where self.import_queue.block_import_results().await } + pub(super) fn unreliable_peers(&self) -> Vec { + self.peers + .iter() + .filter_map(|(peer_id, peer)| { + peer.state + .is_permanently_deprioritized() + .then_some(peer_id) + .copied() + }) + .collect() + } + /// Attempts to restart the sync due to the stalled peer. /// /// Returns `true` if the sync is restarted with a new peer. @@ -249,7 +282,7 @@ where return false; } - // Pick a random peer, even if it's marked as discouraged. + // Pick a random peer, even if it's marked as deprioritized. self.rng .choice(sync_candidates) .expect("Sync candidates must be non-empty as checked; qed") @@ -290,10 +323,13 @@ where } } - pub(super) fn mark_peer_as_discouraged(&mut self, stalled_peer: PeerId) { - self.peers - .entry(stalled_peer) - .and_modify(|p| p.state = PeerSyncState::Discouraged); + pub(super) fn note_peer_stalled(&mut self, stalled_peer: PeerId) { + self.peers.entry(stalled_peer).and_modify(|p| { + let current_stalled_count = p.state.stalled_count(); + p.state = PeerSyncState::Deprioritized { + stalled_count: current_stalled_count + 1, + }; + }); } pub(super) fn remove_peer(&mut self, peer_id: PeerId) { @@ -407,70 +443,99 @@ where let our_best = self.client.best_number(); - let Some(best_peer) = self - .peers - .values_mut() - .filter(|peer| peer.best_number > our_best && peer.state.is_available()) - .min_by_key(|peer| peer.latency) - else { - return SyncAction::None; + let find_best_available_peer = || { + self.peers + .iter() + .filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available()) + .min_by_key(|(_peer_id, peer)| peer.latency) + .map(|(peer_id, _peer)| peer_id) }; - let peer_best = best_peer.best_number; + let find_best_deprioritized_peer = || { + self.peers + .iter() + .filter(|(_peer_id, peer)| peer.best_number > our_best) + .filter_map(|(peer_id, peer)| { + if let PeerSyncState::Deprioritized { stalled_count } = peer.state { + if stalled_count > MAX_STALLS { + None + } else { + Some((peer_id, stalled_count, peer.latency)) + } + } else { + None + } + }) + .min_by( + |(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| { + // First, compare stalled_count, then latency if stalled_count is equal + match stalled_count_a.cmp(stalled_count_b) { + std::cmp::Ordering::Equal => latency_a.cmp(latency_b), // compare latency if stalled_count is the same + other => other, // otherwise, return the comparison of stalled_count + } + }, + ) + .map(|(peer_id, _stalled_count, _latency)| peer_id) + }; - if peer_best > our_best { - let sync_peer = best_peer.peer_id; + let Some(next_peer_id) = find_best_available_peer() + .or_else(find_best_deprioritized_peer) + .copied() + else { + return SyncAction::None; + }; - best_peer.state = PeerSyncState::DownloadingNew { start: our_best }; + let Some(next_peer) = self.peers.get_mut(&next_peer_id) else { + return SyncAction::None; + }; - let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP; + let client = self.client.clone(); + let peer_store = self.peer_store.clone(); - let client = self.client.clone(); - let peer_store = self.peer_store.clone(); + let peer_best = next_peer.best_number; + let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP; - // Start major syncing if the gap is significant. - let (new_syncing, sync_action) = if require_major_sync { - let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height() - || matches!(self.sync_strategy, SyncStrategy::BlocksFirst); + // Start major syncing if the gap is significant. + let (new_syncing, sync_action) = if require_major_sync { + let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height() + || matches!(self.sync_strategy, SyncStrategy::BlocksFirst); - tracing::debug!( - latency = ?best_peer.latency, - "⏩ Starting major sync ({}) from {sync_peer:?} at #{our_best}", - if blocks_first { "blocks-first" } else { "headers-first" } - ); + tracing::debug!( + latency = ?next_peer.latency, + "⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}", + if blocks_first { "blocks-first" } else { "headers-first" } + ); - if blocks_first { - let (downloader, blocks_request) = - BlocksFirstDownloader::new(client, sync_peer, peer_best, peer_store); - ( - Syncing::BlocksFirst(Box::new(downloader)), - SyncAction::Request(blocks_request), - ) - } else { - let (downloader, sync_action) = HeadersFirstDownloader::new( - client, - self.header_verifier.clone(), - sync_peer, - peer_best, - peer_store, - ); - (Syncing::HeadersFirst(Box::new(downloader)), sync_action) - } - } else { + if blocks_first { let (downloader, blocks_request) = - BlocksFirstDownloader::new(client, sync_peer, peer_best, peer_store); + BlocksFirstDownloader::new(client, next_peer_id, peer_best, peer_store); ( Syncing::BlocksFirst(Box::new(downloader)), SyncAction::Request(blocks_request), ) - }; - - self.update_syncing_state(new_syncing); + } else { + let (downloader, sync_action) = HeadersFirstDownloader::new( + client, + self.header_verifier.clone(), + next_peer_id, + peer_best, + peer_store, + ); + (Syncing::HeadersFirst(Box::new(downloader)), sync_action) + } + } else { + let (downloader, blocks_request) = + BlocksFirstDownloader::new(client, next_peer_id, peer_best, peer_store); + ( + Syncing::BlocksFirst(Box::new(downloader)), + SyncAction::Request(blocks_request), + ) + }; - return sync_action; - } + next_peer.state = PeerSyncState::DownloadingNew { start: our_best }; + self.update_syncing_state(new_syncing); - SyncAction::None + sync_action } fn update_syncing_state(&mut self, new: Syncing) { diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index d96ee89..c6bbe5c 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -215,6 +215,14 @@ where let sync_action = self.chain_sync.on_tick(); self.do_sync_action(sync_action); + let unreliable_peers = self.chain_sync.unreliable_peers(); + for peer in unreliable_peers { + self.peer_manager + .disconnect(peer, Error::UnreliableSyncPeer); + self.chain_sync.remove_peer(peer); + self.peer_store.remove_peer(peer); + } + if let Some(SlowPeer { peer_id, peer_latency, @@ -456,7 +464,7 @@ where } SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => { if self.chain_sync.restart_sync(stalled_peer_id) { - self.chain_sync.mark_peer_as_discouraged(stalled_peer_id); + self.chain_sync.note_peer_stalled(stalled_peer_id); } } SyncAction::Disconnect(peer_id, reason) => { diff --git a/crates/subcoin-rpc/src/network.rs b/crates/subcoin-rpc/src/network.rs index 765f2f5..ab759c7 100644 --- a/crates/subcoin-rpc/src/network.rs +++ b/crates/subcoin-rpc/src/network.rs @@ -13,7 +13,7 @@ use subcoin_network::{NetworkHandle, NetworkStatus, PeerSync, PeerSyncState}; #[serde(rename_all = "camelCase")] pub enum SyncState { Available, - Discouraged, + Deprioritized, DownloadingNew, } @@ -72,7 +72,7 @@ where let mut sync_peers = self.network_handle.sync_peers().await; let mut available = 0; - let mut discouraged = 0; + let mut deprioritized = 0; let mut downloading_new = 0; let mut best_known_block = 0; @@ -80,7 +80,7 @@ where for peer in &sync_peers { match peer.state { PeerSyncState::Available => available += 1, - PeerSyncState::Discouraged => discouraged += 1, + PeerSyncState::Deprioritized { .. } => deprioritized += 1, PeerSyncState::DownloadingNew { .. } => downloading_new += 1, } @@ -94,7 +94,7 @@ where Ok(NetworkPeers { sync_state_counts: HashMap::from([ (SyncState::Available, available), - (SyncState::Discouraged, discouraged), + (SyncState::Deprioritized, deprioritized), (SyncState::DownloadingNew, downloading_new), ]), best_known_block: if best_known_block > 0 {