Skip to content

Commit

Permalink
Refactor Discouraged peer
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Oct 24, 2024
1 parent 08e84d0 commit 6643752
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 61 deletions.
2 changes: 2 additions & 0 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ pub enum Error {
PingTimeout,
#[error("Ping latency exceeds the threshold")]
PingLatencyTooHigh,
#[error("Peer is deprioritized and has encountered multiple syncing failures")]
UnreliableSyncPeer,
#[error("Peer's latency ({0} ms) is too high")]
SlowPeer(Latency),
#[error("Unexpected pong message")]
Expand Down
177 changes: 121 additions & 56 deletions crates/subcoin-network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@ const LATENCY_IMPROVEMENT_THRESHOLD: u128 = 4;
// Define a constant for the low ping latency cutoff, in milliseconds.
const LOW_LATENCY_CUTOFF: Latency = 20;

/// Maximum number of syncing retries for a deprioritized peer.
const MAX_STALLS: usize = 5;

/// The state of syncing between a Peer and ourselves.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
#[serde(rename_all = "camelCase")]
pub enum PeerSyncState {
/// Available for sync requests.
Available,
/// The peer has been discouraged due to syncing from this peer was stalled before.
Discouraged,
/// The peer has been deprioritized due to past syncing issues (e.g., stalling).
Deprioritized {
/// Number of times the peer has stalled.
stalled_count: usize,
},
/// Actively downloading new blocks, starting from the given Number.
DownloadingNew { start: u32 },
}
Expand All @@ -43,6 +49,21 @@ impl PeerSyncState {
pub fn is_available(&self) -> bool {
matches!(self, Self::Available)
}

fn stalled_count(&self) -> usize {
match self {
Self::Deprioritized { stalled_count } => *stalled_count,
_ => 0,
}
}

/// Determines if the peer is permanently deprioritized based on the stall count.
fn is_permanently_deprioritized(&self) -> bool {
match self {
PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS,
_ => false,
}
}
}

/// Contains all the data about a Peer that we are trying to sync with.
Expand Down Expand Up @@ -89,7 +110,7 @@ pub(crate) enum SyncAction {
SwitchToBlocksFirstSync,
/// Disconnect from the peer for the given reason.
Disconnect(PeerId, Error),
/// Make this peer as discouraged and restart the current syncing
/// Make this peer as deprioritized and restart the current syncing
/// process using other sync candidates if there are any.
RestartSyncWithStalledPeer(PeerId),
/// Blocks-First sync finished, switch syncing state to idle.
Expand Down Expand Up @@ -196,6 +217,18 @@ where
self.import_queue.block_import_results().await
}

pub(super) fn unreliable_peers(&self) -> Vec<PeerId> {
self.peers
.iter()
.filter_map(|(peer_id, peer)| {
peer.state
.is_permanently_deprioritized()
.then_some(peer_id)
.copied()
})
.collect()
}

/// Attempts to restart the sync due to the stalled peer.
///
/// Returns `true` if the sync is restarted with a new peer.
Expand Down Expand Up @@ -249,7 +282,7 @@ where
return false;
}

// Pick a random peer, even if it's marked as discouraged.
// Pick a random peer, even if it's marked as deprioritized.
self.rng
.choice(sync_candidates)
.expect("Sync candidates must be non-empty as checked; qed")
Expand Down Expand Up @@ -290,10 +323,13 @@ where
}
}

pub(super) fn mark_peer_as_discouraged(&mut self, stalled_peer: PeerId) {
self.peers
.entry(stalled_peer)
.and_modify(|p| p.state = PeerSyncState::Discouraged);
pub(super) fn note_peer_stalled(&mut self, stalled_peer: PeerId) {
self.peers.entry(stalled_peer).and_modify(|p| {
let current_stalled_count = p.state.stalled_count();
p.state = PeerSyncState::Deprioritized {
stalled_count: current_stalled_count + 1,
};
});
}

pub(super) fn remove_peer(&mut self, peer_id: PeerId) {
Expand Down Expand Up @@ -407,70 +443,99 @@ where

let our_best = self.client.best_number();

let Some(best_peer) = self
.peers
.values_mut()
.filter(|peer| peer.best_number > our_best && peer.state.is_available())
.min_by_key(|peer| peer.latency)
else {
return SyncAction::None;
let find_best_available_peer = || {
self.peers
.iter()
.filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available())
.min_by_key(|(_peer_id, peer)| peer.latency)
.map(|(peer_id, _peer)| peer_id)
};

let peer_best = best_peer.best_number;
let find_best_deprioritized_peer = || {
self.peers
.iter()
.filter(|(_peer_id, peer)| peer.best_number > our_best)
.filter_map(|(peer_id, peer)| {
if let PeerSyncState::Deprioritized { stalled_count } = peer.state {
if stalled_count > MAX_STALLS {
None
} else {
Some((peer_id, stalled_count, peer.latency))
}
} else {
None
}
})
.min_by(
|(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| {
// First, compare stalled_count, then latency if stalled_count is equal
match stalled_count_a.cmp(stalled_count_b) {
std::cmp::Ordering::Equal => latency_a.cmp(latency_b), // compare latency if stalled_count is the same
other => other, // otherwise, return the comparison of stalled_count
}
},
)
.map(|(peer_id, _stalled_count, _latency)| peer_id)
};

if peer_best > our_best {
let sync_peer = best_peer.peer_id;
let Some(next_peer_id) = find_best_available_peer()
.or_else(find_best_deprioritized_peer)
.copied()
else {
return SyncAction::None;
};

best_peer.state = PeerSyncState::DownloadingNew { start: our_best };
let Some(next_peer) = self.peers.get_mut(&next_peer_id) else {
return SyncAction::None;
};

let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;
let client = self.client.clone();
let peer_store = self.peer_store.clone();

let client = self.client.clone();
let peer_store = self.peer_store.clone();
let peer_best = next_peer.best_number;
let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;

// Start major syncing if the gap is significant.
let (new_syncing, sync_action) = if require_major_sync {
let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
|| matches!(self.sync_strategy, SyncStrategy::BlocksFirst);
// Start major syncing if the gap is significant.
let (new_syncing, sync_action) = if require_major_sync {
let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
|| matches!(self.sync_strategy, SyncStrategy::BlocksFirst);

tracing::debug!(
latency = ?best_peer.latency,
"⏩ Starting major sync ({}) from {sync_peer:?} at #{our_best}",
if blocks_first { "blocks-first" } else { "headers-first" }
);
tracing::debug!(
latency = ?next_peer.latency,
"⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}",
if blocks_first { "blocks-first" } else { "headers-first" }
);

if blocks_first {
let (downloader, blocks_request) =
BlocksFirstDownloader::new(client, sync_peer, peer_best, peer_store);
(
Syncing::BlocksFirst(Box::new(downloader)),
SyncAction::Request(blocks_request),
)
} else {
let (downloader, sync_action) = HeadersFirstDownloader::new(
client,
self.header_verifier.clone(),
sync_peer,
peer_best,
peer_store,
);
(Syncing::HeadersFirst(Box::new(downloader)), sync_action)
}
} else {
if blocks_first {
let (downloader, blocks_request) =
BlocksFirstDownloader::new(client, sync_peer, peer_best, peer_store);
BlocksFirstDownloader::new(client, next_peer_id, peer_best, peer_store);
(
Syncing::BlocksFirst(Box::new(downloader)),
SyncAction::Request(blocks_request),
)
};

self.update_syncing_state(new_syncing);
} else {
let (downloader, sync_action) = HeadersFirstDownloader::new(
client,
self.header_verifier.clone(),
next_peer_id,
peer_best,
peer_store,
);
(Syncing::HeadersFirst(Box::new(downloader)), sync_action)
}
} else {
let (downloader, blocks_request) =
BlocksFirstDownloader::new(client, next_peer_id, peer_best, peer_store);
(
Syncing::BlocksFirst(Box::new(downloader)),
SyncAction::Request(blocks_request),
)
};

return sync_action;
}
next_peer.state = PeerSyncState::DownloadingNew { start: our_best };
self.update_syncing_state(new_syncing);

SyncAction::None
sync_action
}

fn update_syncing_state(&mut self, new: Syncing<Block, Client>) {
Expand Down
10 changes: 9 additions & 1 deletion crates/subcoin-network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ where
let sync_action = self.chain_sync.on_tick();
self.do_sync_action(sync_action);

let unreliable_peers = self.chain_sync.unreliable_peers();
for peer in unreliable_peers {
self.peer_manager
.disconnect(peer, Error::UnreliableSyncPeer);
self.chain_sync.remove_peer(peer);
self.peer_store.remove_peer(peer);
}

if let Some(SlowPeer {
peer_id,
peer_latency,
Expand Down Expand Up @@ -456,7 +464,7 @@ where
}
SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => {
if self.chain_sync.restart_sync(stalled_peer_id) {
self.chain_sync.mark_peer_as_discouraged(stalled_peer_id);
self.chain_sync.note_peer_stalled(stalled_peer_id);
}
}
SyncAction::Disconnect(peer_id, reason) => {
Expand Down
8 changes: 4 additions & 4 deletions crates/subcoin-rpc/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use subcoin_network::{NetworkHandle, NetworkStatus, PeerSync, PeerSyncState};
#[serde(rename_all = "camelCase")]
pub enum SyncState {
Available,
Discouraged,
Deprioritized,
DownloadingNew,
}

Expand Down Expand Up @@ -72,15 +72,15 @@ where
let mut sync_peers = self.network_handle.sync_peers().await;

let mut available = 0;
let mut discouraged = 0;
let mut deprioritized = 0;
let mut downloading_new = 0;

let mut best_known_block = 0;

for peer in &sync_peers {
match peer.state {
PeerSyncState::Available => available += 1,
PeerSyncState::Discouraged => discouraged += 1,
PeerSyncState::Deprioritized { .. } => deprioritized += 1,
PeerSyncState::DownloadingNew { .. } => downloading_new += 1,
}

Expand All @@ -94,7 +94,7 @@ where
Ok(NetworkPeers {
sync_state_counts: HashMap::from([
(SyncState::Available, available),
(SyncState::Discouraged, discouraged),
(SyncState::Deprioritized, deprioritized),
(SyncState::DownloadingNew, downloading_new),
]),
best_known_block: if best_known_block > 0 {
Expand Down

0 comments on commit 6643752

Please sign in to comment.