From 6f7c2dca91a0c721ef44562ef70b130c58a905fe Mon Sep 17 00:00:00 2001 From: cloudhead Date: Fri, 15 Dec 2023 15:55:42 +0100 Subject: [PATCH] node: Introduce connection penalty system Previously, we might ban a node if we're having trouble connecting to it, but this could be due to internet connectivity issues. We introduce a penalty score for nodes, that increases if there are connection issues or the node misbehaves, and decreases on successful connections. When connected to new nodes, we sort them so that low-penalty nodes are attempted first. --- radicle-node/src/service.rs | 94 ++++++++++++------ radicle-node/src/service/session.rs | 15 +-- radicle-node/src/tests.rs | 16 ++-- radicle/src/node.rs | 22 +++++ radicle/src/node/address.rs | 4 +- radicle/src/node/address/store.rs | 144 ++++++++++++++++++++-------- 6 files changed, 212 insertions(+), 83 deletions(-) diff --git a/radicle-node/src/service.rs b/radicle-node/src/service.rs index 66d7e9514..fd5b693fb 100644 --- a/radicle-node/src/service.rs +++ b/radicle-node/src/service.rs @@ -29,7 +29,7 @@ use radicle::node::config::PeerConfig; use radicle::node::routing::Store as _; use radicle::node::seed; use radicle::node::seed::Store as _; -use radicle::node::ConnectOptions; +use radicle::node::{ConnectOptions, Penalty, Severity}; use radicle::storage::RepositoryError; use crate::crypto; @@ -118,6 +118,14 @@ impl SyncedRouting { } } +/// A peer we can connect to. +#[derive(Debug, Clone)] +struct Peer { + nid: NodeId, + addresses: Vec, + penalty: Penalty, +} + /// General service error. #[derive(thiserror::Error, Debug)] pub enum Error { @@ -1029,6 +1037,7 @@ where } }; let link = session.link; + let addr = session.addr.clone(); self.fetching.retain(|_, fetching| { if fetching.from != remote { @@ -1058,15 +1067,31 @@ where self.outbox.wakeup(delay); } else { debug!(target: "service", "Dropping peer {remote}.."); + self.sessions.remove(&remote); + + let severity = match reason { + DisconnectReason::Dial(_) + | DisconnectReason::Fetch(_) + | DisconnectReason::Connection(_) => { + if self.is_online() { + // If we're "online", there's something wrong with this + // peer connection specifically. + Severity::Medium + } else { + Severity::Low + } + } + DisconnectReason::Session(e) => e.severity(), + DisconnectReason::Command => Severity::Low, + }; - if let Err(e) = - self.db - .addresses_mut() - .disconnected(&remote, &session.addr, reason.is_transient()) + if let Err(e) = self + .db + .addresses_mut() + .disconnected(&remote, &addr, severity) { error!(target: "service", "Error updating address store: {e}"); } - self.sessions.remove(&remote); // Only re-attempt outbound connections, since we don't care if an inbound connection // is dropped. if link.is_outbound() { @@ -1590,6 +1615,15 @@ where ] } + /// Try to guess whether we're online or not. + fn is_online(&self) -> bool { + self.sessions + .connected() + .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL) + .count() + > 0 + } + /// Update our routing table with our local node's inventory. fn sync_inventory(&mut self) -> Result { let inventory = self.storage.inventory()?; @@ -1892,24 +1926,35 @@ where } } - /// Get a list of peers available to connect to. - fn available_peers(&mut self) -> HashMap> { + /// Get a list of peers available to connect to, sorted by lowest penalty. + fn available_peers(&mut self) -> Vec { match self.db.addresses().entries() { Ok(entries) => { // Nb. we don't want to connect to any peers that already have a session with us, // even if it's in a disconnected state. Those sessions are re-attempted automatically. - entries - .filter(|(_, ka)| !ka.banned) - .filter(|(nid, _)| !self.sessions.contains_key(nid)) - .filter(|(nid, _)| nid != &self.node_id()) - .fold(HashMap::new(), |mut acc, (nid, addr)| { - acc.entry(nid).or_default().push(addr); + let mut peers = entries + .filter(|entry| !entry.address.banned) + .filter(|entry| !entry.penalty.is_threshold_reached()) + .filter(|entry| !self.sessions.contains_key(&entry.node)) + .filter(|entry| &entry.node != self.nid()) + .fold(HashMap::new(), |mut acc, entry| { + acc.entry(entry.node) + .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone())) + .or_insert_with(|| Peer { + nid: entry.node, + addresses: vec![entry.address], + penalty: entry.penalty, + }); acc }) + .into_values() + .collect::>(); + peers.sort_by_key(|p| p.penalty); + peers } Err(e) => { error!(target: "service", "Unable to lookup available peers in address book: {e}"); - HashMap::new() + Vec::new() } } } @@ -1974,8 +2019,9 @@ where for (id, ka) in self .available_peers() .into_iter() - .filter_map(|(nid, kas)| { - kas.into_iter() + .filter_map(|peer| { + peer.addresses + .into_iter() .find(|ka| match (ka.last_success, ka.last_attempt) { // If we succeeded the last time we tried, this is a good address. // TODO: This will always be hit after a success, and never re-attempted after @@ -1986,7 +2032,7 @@ where // If we've never tried this address, it's worth a try. (None, None) => true, }) - .map(|ka| (nid, ka)) + .map(|ka| (peer.nid, ka)) }) .take(wanted) { @@ -2094,18 +2140,6 @@ impl DisconnectReason { pub fn is_connection_err(&self) -> bool { matches!(self, Self::Connection(_)) } - - // TODO: These aren't quite correct, since dial errors *can* be transient, eg. - // temporary DNS issue. - pub fn is_transient(&self) -> bool { - match self { - Self::Dial(_) => false, - Self::Connection(_) => true, - Self::Command => false, - Self::Fetch(_) => true, - Self::Session(err) => err.is_transient(), - } - } } impl fmt::Display for DisconnectReason { diff --git a/radicle-node/src/service/session.rs b/radicle-node/src/service/session.rs index 1e9608531..16700bc97 100644 --- a/radicle-node/src/service/session.rs +++ b/radicle-node/src/service/session.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::fmt; use crate::node::config::Limits; +use crate::node::Severity; use crate::service::message; use crate::service::message::Message; use crate::service::{Address, Id, LocalTime, NodeId, Outbox, Rng}; @@ -9,7 +10,7 @@ use crate::Link; pub use crate::node::{PingState, State}; -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, Clone, Copy)] pub enum Error { /// The remote peer sent an invalid announcement timestamp, /// for eg. a timestamp far in the future. @@ -28,13 +29,13 @@ pub enum Error { } impl Error { - /// Check whether this error is transient. - pub fn is_transient(&self) -> bool { + /// Return the severity for this error. + pub fn severity(&self) -> Severity { match self { - Self::InvalidTimestamp(_) => false, - Self::ProtocolMismatch => true, - Self::Misbehavior => false, - Self::Timeout => true, + Self::InvalidTimestamp(_) => Severity::High, + Self::ProtocolMismatch => Severity::High, + Self::Misbehavior => Severity::High, + Self::Timeout => Severity::Low, } } } diff --git a/radicle-node/src/tests.rs b/radicle-node/src/tests.rs index 0bbabc124..fd9cb2874 100644 --- a/radicle-node/src/tests.rs +++ b/radicle-node/src/tests.rs @@ -9,6 +9,7 @@ use std::time; use crossbeam_channel as chan; use netservices::Direction as Link; use radicle::identity::Visibility; +use radicle::node::address::Store; use radicle::node::routing::Store as _; use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT}; use radicle::storage::refs::RefsAt; @@ -898,7 +899,12 @@ fn test_refs_announcement_offline() { // Now we restart Alice's node. It should pick up that something's changed in storage. alice.elapse(LocalDuration::from_secs(60)); - alice.disconnected(bob.id, &DisconnectReason::Command); + alice + .database_mut() + .addresses_mut() + .remove(&bob.id) + .unwrap(); // Make sure we don't reconnect automatically. + alice.disconnected(bob.id, &DisconnectReason::Session(session::Error::Timeout)); alice.outbox().for_each(drop); alice.restart(); alice.connect_to(&bob); @@ -1157,11 +1163,9 @@ fn test_maintain_connections() { alice.import_addresses(&unconnected); // A non-transient error such as this will cause Alice to attempt a different peer. - let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)); + let error = session::Error::Misbehavior; for peer in connected.iter() { - let reason = DisconnectReason::Dial(error.clone()); - assert!(!reason.is_transient()); - alice.disconnected(peer.id(), &reason); + alice.disconnected(peer.id(), &DisconnectReason::Session(error)); let id = alice .outbox() @@ -1210,8 +1214,6 @@ fn test_maintain_connections_failed_attempt() { let reason = DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset))); - assert!(reason.is_transient()); - alice.connect_to(&eve); // Make sure Alice knows about Eve. alice.disconnected(eve.id(), &reason); diff --git a/radicle/src/node.rs b/radicle/src/node.rs index c23ae47c0..a1f42dc61 100644 --- a/radicle/src/node.rs +++ b/radicle/src/node.rs @@ -45,6 +45,8 @@ pub const DEFAULT_PORT: u16 = 8776; pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9); /// Maximum length in bytes of a node alias. pub const MAX_ALIAS_LENGTH: usize = 32; +/// Penalty threshold at which point we avoid connecting to this node. +pub const PENALTY_THRESHOLD: u8 = 32; /// Filename of node database under the node directory. pub const NODE_DB_FILE: &str = "node.db"; /// Filename of policies database under the node directory. @@ -128,6 +130,26 @@ impl fmt::Display for State { } } +/// Severity of a peer misbehavior or a connection problem. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Severity { + Low = 0, + Medium = 1, + High = 8, +} + +/// Node connection penalty. Nodes with a high penalty are deprioritized as peers. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)] +pub struct Penalty(u8); + +impl Penalty { + /// If the penalty threshold is reached, at which point we should just avoid + /// connecting to this node. + pub fn is_threshold_reached(&self) -> bool { + self.0 >= PENALTY_THRESHOLD + } +} + /// Repository sync status for our own refs. #[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)] #[serde(tag = "status")] diff --git a/radicle/src/node/address.rs b/radicle/src/node/address.rs index 3d9290ed9..7330dda5a 100644 --- a/radicle/src/node/address.rs +++ b/radicle/src/node/address.rs @@ -10,7 +10,7 @@ use localtime::LocalTime; use nonempty::NonEmpty; use crate::collections::RandomMap; -use crate::node::{Address, Alias}; +use crate::node::{Address, Alias, Penalty}; use crate::prelude::Timestamp; use crate::{node, profile}; @@ -130,6 +130,8 @@ pub struct Node { pub pow: u32, /// When this data was published. pub timestamp: Timestamp, + /// Node connection penalty. + pub penalty: Penalty, } /// A known address. diff --git a/radicle/src/node/address/store.rs b/radicle/src/node/address/store.rs index 22b8d1624..d9de9a894 100644 --- a/radicle/src/node/address/store.rs +++ b/radicle/src/node/address/store.rs @@ -6,7 +6,7 @@ use thiserror::Error; use crate::node; use crate::node::address::{AddressType, KnownAddress, Node, Source}; -use crate::node::{Address, Alias, AliasError, AliasStore, Database, NodeId}; +use crate::node::{Address, Alias, AliasError, AliasStore, Database, NodeId, Penalty, Severity}; use crate::prelude::Timestamp; use crate::sql::transaction; @@ -22,6 +22,17 @@ pub enum Error { NoRows, } +/// An entry returned by the store. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AddressEntry { + /// Node ID. + pub node: NodeId, + /// Node penalty. + pub penalty: Penalty, + /// Node address. + pub address: KnownAddress, +} + /// Address store. /// /// Used to store node addresses and metadata. @@ -51,20 +62,25 @@ pub trait Store { self.len().map(|l| l == 0) } /// Get the address entries in the store. - fn entries(&self) -> Result>, Error>; + fn entries(&self) -> Result>, Error>; /// Mark a node as attempted at a certain time. fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>; /// Mark a node as successfully connected at a certain time. fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>; /// Mark a node as disconnected. - fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error>; + fn disconnected( + &mut self, + nid: &NodeId, + addr: &Address, + severity: Severity, + ) -> Result<(), Error>; } impl Store for Database { fn get(&self, node: &NodeId) -> Result, Error> { let mut stmt = self .db - .prepare("SELECT features, alias, pow, timestamp FROM nodes WHERE id = ?")?; + .prepare("SELECT features, alias, pow, penalty, timestamp FROM nodes WHERE id = ?")?; stmt.bind((1, node))?; @@ -73,6 +89,8 @@ impl Store for Database { let alias = Alias::from_str(row.read::<&str, _>("alias"))?; let timestamp = row.read::("timestamp") as Timestamp; let pow = row.read::("pow") as u32; + let penalty = row.read::("penalty").min(u8::MAX as i64); + let penalty = Penalty(penalty as u8); let addrs = self.addresses_of(node)?; Ok(Some(Node { @@ -80,6 +98,7 @@ impl Store for Database { alias, pow, timestamp, + penalty, addrs, })) } else { @@ -183,10 +202,15 @@ impl Store for Database { Ok(self.db.change_count() > 0) } - fn entries(&self) -> Result>, Error> { + fn entries(&self) -> Result>, Error> { let mut stmt = self .db - .prepare("SELECT node, type, value, source, last_success, last_attempt, banned FROM addresses ORDER BY node")? + .prepare( + "SELECT a.node, a.type, a.value, a.source, a.last_success, a.last_attempt, a.banned, n.penalty + FROM addresses AS a + JOIN nodes AS n ON a.node = n.id + ORDER BY n.penalty ASC, n.id ASC", + )? .into_iter(); let mut entries = Vec::new(); @@ -200,17 +224,20 @@ impl Store for Database { let last_success = last_success.map(|t| LocalTime::from_millis(t as u128)); let last_attempt = last_attempt.map(|t| LocalTime::from_millis(t as u128)); let banned = row.read::("banned").is_positive(); + let penalty = row.read::("penalty"); + let penalty = Penalty(penalty as u8); // Clamped at `u8::MAX`. - entries.push(( + entries.push(AddressEntry { node, - KnownAddress { + penalty, + address: KnownAddress { addr, source, last_success, last_attempt, banned, }, - )); + }); } Ok(Box::new(entries.into_iter())) } @@ -234,37 +261,47 @@ impl Store for Database { } fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> { - let mut stmt = self.db.prepare( - "UPDATE `addresses` - SET last_success = ?1 - WHERE node = ?2 - AND type = ?3 - AND value = ?4", - )?; - - stmt.bind((1, time as i64))?; - stmt.bind((2, nid))?; - stmt.bind((3, AddressType::from(addr)))?; - stmt.bind((4, addr))?; - stmt.next()?; - - Ok(()) - } - - fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error> { - // Ban address if not a transient failure. - if !transient { - let mut stmt = self.db.prepare( + transaction(&self.db, |db| { + let mut stmt = db.prepare( "UPDATE `addresses` - SET banned = 1 - WHERE node = ?1 AND type = ?2 AND value = ?3", + SET last_success = ?1 + WHERE node = ?2 + AND type = ?3 + AND value = ?4", )?; + stmt.bind((1, time as i64))?; + stmt.bind((2, nid))?; + stmt.bind((3, AddressType::from(addr)))?; + stmt.bind((4, addr))?; + stmt.next()?; + + // Reduce penalty by half on successful connect. + db.prepare("UPDATE `nodes` SET penalty = penalty / 2 WHERE node = ?1")?; + stmt.bind((1, nid))?; - stmt.bind((2, AddressType::from(addr)))?; - stmt.bind((3, addr))?; stmt.next()?; - } + + Ok(()) + }) + } + + fn disconnected( + &mut self, + nid: &NodeId, + _addr: &Address, + severity: Severity, + ) -> Result<(), Error> { + let mut stmt = self.db.prepare( + "UPDATE `nodes` + SET penalty = penalty + ?2 + WHERE id = ?1", + )?; + + stmt.bind((1, nid))?; + stmt.bind((2, severity as i64))?; + stmt.next()?; + Ok(()) } } @@ -584,7 +621,11 @@ mod test { last_attempt: None, banned: false, }; - expected.push((id, ka.clone())); + expected.push(AddressEntry { + node: id, + penalty: Penalty::default(), + address: ka.clone(), + }); cache .insert(&id, features, alias.clone(), 0, timestamp, [ka]) .unwrap(); @@ -592,10 +633,37 @@ mod test { let mut actual = cache.entries().unwrap().collect::>(); - actual.sort_by_key(|(i, _)| *i); - expected.sort_by_key(|(i, _)| *i); + actual.sort_by_key(|ae| ae.node); + expected.sort_by_key(|ae| ae.node); assert_eq!(cache.len().unwrap(), actual.len()); assert_eq!(actual, expected); } + + #[test] + fn test_disconnected() { + let alice = arbitrary::gen::(1); + let addr = arbitrary::gen::
(1); + let mut cache = Database::memory().unwrap(); + let features = node::Features::SEED; + let timestamp = LocalTime::now().as_millis(); + + cache + .insert(&alice, features, Alias::new("alice"), 16, timestamp, []) + .unwrap(); + let node = cache.get(&alice).unwrap().unwrap(); + assert_eq!(node.penalty, Penalty::default()); + + cache.disconnected(&alice, &addr, Severity::Low).unwrap(); + let node = cache.get(&alice).unwrap().unwrap(); + assert_eq!(node.penalty, Penalty::default()); + + cache.disconnected(&alice, &addr, Severity::Medium).unwrap(); + let node = cache.get(&alice).unwrap().unwrap(); + assert_eq!(node.penalty, Penalty(1)); + + cache.disconnected(&alice, &addr, Severity::High).unwrap(); + let node = cache.get(&alice).unwrap().unwrap(); + assert_eq!(node.penalty, Penalty(9)); + } }