Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
node: Introduce connection penalty system
Browse files Browse the repository at this point in the history
Previously, we might ban a node if we're having trouble connecting to
it, but this could be due to internet connectivity issues.

We introduce a penalty score for nodes, that increases if there are
connection issues or the node misbehaves, and decreases on successful
connections.

When connected to new nodes, we sort them so that low-penalty nodes are
attempted first.
  • Loading branch information
cloudhead committed Dec 15, 2023
1 parent bdcaa60 commit 6f7c2dc
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 83 deletions.
94 changes: 64 additions & 30 deletions radicle-node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use radicle::node::config::PeerConfig;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::ConnectOptions;
use radicle::node::{ConnectOptions, Penalty, Severity};
use radicle::storage::RepositoryError;

use crate::crypto;
Expand Down Expand Up @@ -118,6 +118,14 @@ impl SyncedRouting {
}
}

/// A peer we can connect to.
#[derive(Debug, Clone)]
struct Peer {
nid: NodeId,
addresses: Vec<KnownAddress>,
penalty: Penalty,
}

/// General service error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -1029,6 +1037,7 @@ where
}
};
let link = session.link;
let addr = session.addr.clone();

self.fetching.retain(|_, fetching| {
if fetching.from != remote {
Expand Down Expand Up @@ -1058,15 +1067,31 @@ where
self.outbox.wakeup(delay);
} else {
debug!(target: "service", "Dropping peer {remote}..");
self.sessions.remove(&remote);

let severity = match reason {
DisconnectReason::Dial(_)
| DisconnectReason::Fetch(_)
| DisconnectReason::Connection(_) => {
if self.is_online() {
// If we're "online", there's something wrong with this
// peer connection specifically.
Severity::Medium
} else {
Severity::Low
}
}
DisconnectReason::Session(e) => e.severity(),
DisconnectReason::Command => Severity::Low,
};

if let Err(e) =
self.db
.addresses_mut()
.disconnected(&remote, &session.addr, reason.is_transient())
if let Err(e) = self
.db
.addresses_mut()
.disconnected(&remote, &addr, severity)
{
error!(target: "service", "Error updating address store: {e}");
}
self.sessions.remove(&remote);
// Only re-attempt outbound connections, since we don't care if an inbound connection
// is dropped.
if link.is_outbound() {
Expand Down Expand Up @@ -1590,6 +1615,15 @@ where
]
}

/// Try to guess whether we're online or not.
fn is_online(&self) -> bool {
self.sessions
.connected()
.filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
.count()
> 0
}

/// Update our routing table with our local node's inventory.
fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
let inventory = self.storage.inventory()?;
Expand Down Expand Up @@ -1892,24 +1926,35 @@ where
}
}

/// Get a list of peers available to connect to.
fn available_peers(&mut self) -> HashMap<NodeId, Vec<KnownAddress>> {
/// Get a list of peers available to connect to, sorted by lowest penalty.
fn available_peers(&mut self) -> Vec<Peer> {
match self.db.addresses().entries() {
Ok(entries) => {
// Nb. we don't want to connect to any peers that already have a session with us,
// even if it's in a disconnected state. Those sessions are re-attempted automatically.
entries
.filter(|(_, ka)| !ka.banned)
.filter(|(nid, _)| !self.sessions.contains_key(nid))
.filter(|(nid, _)| nid != &self.node_id())
.fold(HashMap::new(), |mut acc, (nid, addr)| {
acc.entry(nid).or_default().push(addr);
let mut peers = entries
.filter(|entry| !entry.address.banned)
.filter(|entry| !entry.penalty.is_threshold_reached())
.filter(|entry| !self.sessions.contains_key(&entry.node))
.filter(|entry| &entry.node != self.nid())
.fold(HashMap::new(), |mut acc, entry| {
acc.entry(entry.node)
.and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
.or_insert_with(|| Peer {
nid: entry.node,
addresses: vec![entry.address],
penalty: entry.penalty,
});
acc
})
.into_values()
.collect::<Vec<_>>();
peers.sort_by_key(|p| p.penalty);
peers
}
Err(e) => {
error!(target: "service", "Unable to lookup available peers in address book: {e}");
HashMap::new()
Vec::new()
}
}
}
Expand Down Expand Up @@ -1974,8 +2019,9 @@ where
for (id, ka) in self
.available_peers()
.into_iter()
.filter_map(|(nid, kas)| {
kas.into_iter()
.filter_map(|peer| {
peer.addresses
.into_iter()
.find(|ka| match (ka.last_success, ka.last_attempt) {
// If we succeeded the last time we tried, this is a good address.
// TODO: This will always be hit after a success, and never re-attempted after
Expand All @@ -1986,7 +2032,7 @@ where
// If we've never tried this address, it's worth a try.
(None, None) => true,
})
.map(|ka| (nid, ka))
.map(|ka| (peer.nid, ka))
})
.take(wanted)
{
Expand Down Expand Up @@ -2094,18 +2140,6 @@ impl DisconnectReason {
pub fn is_connection_err(&self) -> bool {
matches!(self, Self::Connection(_))
}

// TODO: These aren't quite correct, since dial errors *can* be transient, eg.
// temporary DNS issue.
pub fn is_transient(&self) -> bool {
match self {
Self::Dial(_) => false,
Self::Connection(_) => true,
Self::Command => false,
Self::Fetch(_) => true,
Self::Session(err) => err.is_transient(),
}
}
}

impl fmt::Display for DisconnectReason {
Expand Down
15 changes: 8 additions & 7 deletions radicle-node/src/service/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::collections::HashSet;
use std::fmt;

use crate::node::config::Limits;
use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, Id, LocalTime, NodeId, Outbox, Rng};
use crate::Link;

pub use crate::node::{PingState, State};

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
/// The remote peer sent an invalid announcement timestamp,
/// for eg. a timestamp far in the future.
Expand All @@ -28,13 +29,13 @@ pub enum Error {
}

impl Error {
/// Check whether this error is transient.
pub fn is_transient(&self) -> bool {
/// Return the severity for this error.
pub fn severity(&self) -> Severity {
match self {
Self::InvalidTimestamp(_) => false,
Self::ProtocolMismatch => true,
Self::Misbehavior => false,
Self::Timeout => true,
Self::InvalidTimestamp(_) => Severity::High,
Self::ProtocolMismatch => Severity::High,
Self::Misbehavior => Severity::High,
Self::Timeout => Severity::Low,
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions radicle-node/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time;
use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::identity::Visibility;
use radicle::node::address::Store;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
Expand Down Expand Up @@ -898,7 +899,12 @@ fn test_refs_announcement_offline() {

// Now we restart Alice's node. It should pick up that something's changed in storage.
alice.elapse(LocalDuration::from_secs(60));
alice.disconnected(bob.id, &DisconnectReason::Command);
alice
.database_mut()
.addresses_mut()
.remove(&bob.id)
.unwrap(); // Make sure we don't reconnect automatically.
alice.disconnected(bob.id, &DisconnectReason::Session(session::Error::Timeout));
alice.outbox().for_each(drop);
alice.restart();
alice.connect_to(&bob);
Expand Down Expand Up @@ -1157,11 +1163,9 @@ fn test_maintain_connections() {
alice.import_addresses(&unconnected);

// A non-transient error such as this will cause Alice to attempt a different peer.
let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
let error = session::Error::Misbehavior;
for peer in connected.iter() {
let reason = DisconnectReason::Dial(error.clone());
assert!(!reason.is_transient());
alice.disconnected(peer.id(), &reason);
alice.disconnected(peer.id(), &DisconnectReason::Session(error));

let id = alice
.outbox()
Expand Down Expand Up @@ -1210,8 +1214,6 @@ fn test_maintain_connections_failed_attempt() {
let reason =
DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

assert!(reason.is_transient());

alice.connect_to(&eve);
// Make sure Alice knows about Eve.
alice.disconnected(eve.id(), &reason);
Expand Down
22 changes: 22 additions & 0 deletions radicle/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub const DEFAULT_PORT: u16 = 8776;
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
/// Penalty threshold at which point we avoid connecting to this node.
pub const PENALTY_THRESHOLD: u8 = 32;
/// Filename of node database under the node directory.
pub const NODE_DB_FILE: &str = "node.db";
/// Filename of policies database under the node directory.
Expand Down Expand Up @@ -128,6 +130,26 @@ impl fmt::Display for State {
}
}

/// Severity of a peer misbehavior or a connection problem.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Severity {
Low = 0,
Medium = 1,
High = 8,
}

/// Node connection penalty. Nodes with a high penalty are deprioritized as peers.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
pub struct Penalty(u8);

impl Penalty {
/// If the penalty threshold is reached, at which point we should just avoid
/// connecting to this node.
pub fn is_threshold_reached(&self) -> bool {
self.0 >= PENALTY_THRESHOLD
}
}

/// Repository sync status for our own refs.
#[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "status")]
Expand Down
4 changes: 3 additions & 1 deletion radicle/src/node/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use localtime::LocalTime;
use nonempty::NonEmpty;

use crate::collections::RandomMap;
use crate::node::{Address, Alias};
use crate::node::{Address, Alias, Penalty};
use crate::prelude::Timestamp;
use crate::{node, profile};

Expand Down Expand Up @@ -130,6 +130,8 @@ pub struct Node {
pub pow: u32,
/// When this data was published.
pub timestamp: Timestamp,
/// Node connection penalty.
pub penalty: Penalty,
}

/// A known address.
Expand Down
Loading

0 comments on commit 6f7c2dc

Please sign in to comment.