Skip to content

Commit

Permalink
Periodically evict the slowest peer for potential better new peers
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Jul 26, 2024
1 parent ab2d43d commit dbd807a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
4 changes: 4 additions & 0 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub enum Error {
ConnectionTimeout,
#[error("Ping timeout")]
PingTimeout,
#[error("Ping latency exceeds the threshold")]
PingLatencyTooHigh,
#[error("Peer's latency ({0} ms) is too high")]
SlowPeer(Latency),
#[error("Unexpected pong message")]
UnexpectedPong,
#[error("Too many block entries in inv message")]
Expand Down
50 changes: 49 additions & 1 deletion crates/subcoin-network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ pub const PROTOCOL_VERSION: u32 = 70016;
/// This version includes support for the `sendheaders` feature.
pub const MIN_PROTOCOL_VERSION: u32 = 70012;

/// Peer is considered as a slow one if the average ping latency is higher than 5 seconds.
const SLOW_PEER_LATENCY: Latency = 5_000;

/// Interval for evicting the slowest peer, 10 minutes.
///
/// Periodically evict the slowest peer whose latency is above the threshold [`SLOW_PEER_LATENCY`]
/// when the peer set is full. This creates opportunities to connect with potentially better peers.
const EVICTION_INTERVAL: Duration = Duration::from_secs(600);

/// Peer configuration.
#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -230,6 +239,12 @@ impl PeerInfo {
}
}

#[derive(Debug)]
pub(crate) struct SlowPeer {
pub(crate) peer_id: PeerId,
pub(crate) peer_latency: Latency,
}

/// Manages the peers in the network.
pub struct PeerManager<Block, Client> {
config: Config,
Expand All @@ -240,6 +255,8 @@ pub struct PeerManager<Block, Client> {
connected_peers: HashMap<PeerId, PeerInfo>,
max_outbound_peers: usize,
connection_initiator: ConnectionInitiator,
/// Time at which the slowest peer was evicted.
last_eviction: Instant,
rng: fastrand::Rng,
_phantom: PhantomData<Block>,
}
Expand All @@ -265,12 +282,13 @@ where
connected_peers: HashMap::new(),
max_outbound_peers,
connection_initiator,
last_eviction: Instant::now(),
rng: fastrand::Rng::new(),
_phantom: Default::default(),
}
}

pub(crate) fn on_tick(&mut self) {
pub(crate) fn on_tick(&mut self) -> Option<SlowPeer> {
let mut timeout_peers = vec![];
let mut should_ping_peers = vec![];

Expand Down Expand Up @@ -305,9 +323,35 @@ where
self.connection_initiator.initiate_outbound_connection(addr);
}
}
None
} else if self.last_eviction.elapsed() > EVICTION_INTERVAL {
// The set of outbound peers is full and the eviction interval elapsed,
// try to evict the slowest peer for discovering potential better peers.
self.find_slowest_peer()
} else {
None
}
}

fn find_slowest_peer(&self) -> Option<SlowPeer> {
self.connected_peers
.iter()
.filter_map(|(peer_id, peer_info)| {
let average_latency = peer_info.ping_latency.average();

if average_latency > SLOW_PEER_LATENCY {
Some((peer_id, average_latency))
} else {
None
}
})
.max_by_key(|(_peer_id, average_latency)| *average_latency)
.map(|(peer_id, peer_latency)| SlowPeer {
peer_id: *peer_id,
peer_latency,
})
}

fn send_pings(&mut self, should_pings: Vec<PeerId>) {
for peer_id in should_pings {
if let Some(peer_info) = self.connected_peers.get_mut(&peer_id) {
Expand Down Expand Up @@ -372,6 +416,10 @@ where
});
}

pub(crate) fn update_last_eviction(&mut self) {
self.last_eviction = Instant::now();
}

/// Checks if a peer is connected.
pub(crate) fn is_connected(&self, peer_id: PeerId) -> bool {
self.connected_peers.contains_key(&peer_id)
Expand Down
16 changes: 13 additions & 3 deletions crates/subcoin-network/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::connection::{ConnectionInitiator, Direction, NewConnection};
use crate::peer_manager::{Config, PeerManager};
use crate::peer_manager::{Config, PeerManager, SlowPeer};
use crate::sync::{ChainSync, LocatorRequest, SyncAction, SyncRequest};
use crate::{Bandwidth, Error, Latency, NetworkStatus, NetworkWorkerMessage, PeerId, SyncStrategy};
use bitcoin::p2p::message::NetworkMessage;
Expand Down Expand Up @@ -118,7 +118,16 @@ where
fn perform_periodic_actions(&mut self) {
let sync_action = self.chain_sync.on_tick();
self.do_sync_action(sync_action);
self.peer_manager.on_tick();
if let Some(SlowPeer {
peer_id,
peer_latency,
}) = self.peer_manager.on_tick()
{
self.peer_manager
.disconnect(peer_id, Error::SlowPeer(peer_latency));
self.peer_manager.update_last_eviction();
self.chain_sync.remove_peer(peer_id);
}
}

fn process_worker_message(&self, worker_msg: NetworkWorkerMessage, bandwidth: &Bandwidth) {
Expand Down Expand Up @@ -240,7 +249,8 @@ where
Ok(latency) => {
// Disconnect the peer directly if the latency is higher than the threshold.
if latency > LATENCY_THRESHOLD {
self.peer_manager.disconnect(from, err);
self.peer_manager
.disconnect(from, Error::PingLatencyTooHigh);
self.chain_sync.remove_peer(from);
} else {
self.chain_sync.set_peer_latency(from, latency);
Expand Down

0 comments on commit dbd807a

Please sign in to comment.