From e77de2495aa7b17521ebab7c70c11e3f003d52c7 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Thu, 9 Nov 2023 18:51:45 -0500 Subject: [PATCH] Bound the size of SnapshotHostsCache (#10133) This PR configures a hard limit on the size of SnapshotHostsCache. The internal storage is replaced with `lru::LruCache`, which implements a simple least-recently-used eviction policy. In the short term, this will prevent memory exhaustion via unbounded addition of entries to the cache. In the long term, we may wish to revisit and implement a more sophisticated eviction policy, especially after we implement a reputation system for peers sharing state parts. --- chain/network/src/config.rs | 6 ++ chain/network/src/config_json.rs | 8 +++ .../src/peer_manager/network_state/mod.rs | 2 +- chain/network/src/snapshot_hosts/mod.rs | 69 +++++++++++-------- chain/network/src/snapshot_hosts/tests.rs | 49 +++++++++++-- 5 files changed, 101 insertions(+), 33 deletions(-) diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index 354943b60ae..4de3b4df690 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -5,6 +5,7 @@ use crate::network_protocol::PeerInfo; use crate::peer_manager::peer_manager_actor::Event; use crate::peer_manager::peer_store; use crate::sink::Sink; +use crate::snapshot_hosts; use crate::stun; use crate::tcp; use crate::types::ROUTED_MESSAGE_TTL; @@ -96,6 +97,7 @@ pub struct NetworkConfig { pub validator: Option, pub peer_store: peer_store::Config, + pub snapshot_hosts: snapshot_hosts::Config, pub whitelist_nodes: Vec, pub handshake_timeout: time::Duration, @@ -285,6 +287,9 @@ impl NetworkConfig { ban_window: cfg.ban_window.try_into()?, peer_expiration_duration: cfg.peer_expiration_duration.try_into()?, }, + snapshot_hosts: snapshot_hosts::Config { + snapshot_hosts_cache_size: cfg.snapshot_hosts_cache_size, + }, whitelist_nodes: if cfg.whitelist_nodes.is_empty() { vec![] } else { @@ -367,6 +372,7 @@ impl NetworkConfig { peer_expiration_duration: time::Duration::seconds(60 * 60), connect_only_to_boot_nodes: false, }, + snapshot_hosts: snapshot_hosts::Config { snapshot_hosts_cache_size: 1000 }, whitelist_nodes: vec![], handshake_timeout: time::Duration::seconds(5), connect_to_reliable_peers_on_startup: true, diff --git a/chain/network/src/config_json.rs b/chain/network/src/config_json.rs index 4eeb4b8d117..a1c10453da0 100644 --- a/chain/network/src/config_json.rs +++ b/chain/network/src/config_json.rs @@ -51,6 +51,10 @@ fn default_monitor_peers_max_period() -> Duration { fn default_peer_states_cache_size() -> u32 { 1000 } +/// Maximum number of snapshot hosts to keep in memory. +fn default_snapshot_hosts_cache_size() -> u32 { + 1000 +} /// Remove peers that we didn't hear about for this amount of time. fn default_peer_expiration_duration() -> Duration { Duration::from_secs(7 * 24 * 60 * 60) @@ -139,6 +143,9 @@ pub struct Config { /// Maximum number of peer states to keep in memory. #[serde(default = "default_peer_states_cache_size")] pub peer_states_cache_size: u32, + /// Maximum number of snapshot hosts to keep in memory. + #[serde(default = "default_snapshot_hosts_cache_size")] + pub snapshot_hosts_cache_size: u32, // Remove peers that were not active for this amount of time. #[serde(default = "default_peer_expiration_duration")] pub peer_expiration_duration: Duration, @@ -296,6 +303,7 @@ impl Default for Config { handshake_timeout: Duration::from_secs(20), skip_sync_wait: false, peer_states_cache_size: default_peer_states_cache_size(), + snapshot_hosts_cache_size: default_snapshot_hosts_cache_size(), ban_window: Duration::from_secs(3 * 60 * 60), blacklist: vec![], ttl_account_id_router: default_ttl_account_id_router(), diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 6ee4e208afd..2d194ead212 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -189,7 +189,7 @@ impl NetworkState { tier1: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), peer_store, - snapshot_hosts: Arc::new(SnapshotHostsCache::new()), + snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())), connection_store: connection_store::ConnectionStore::new(store).unwrap(), pending_reconnect: Mutex::new(Vec::::new()), accounts_data: Arc::new(AccountDataCache::new()), diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 97b3ac76fa7..82e710f15f3 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -5,9 +5,10 @@ //! in the network and stored locally in this cache. use crate::concurrency; -use crate::concurrency::arc_mutex::ArcMutex; use crate::network_protocol::SnapshotHostInfo; +use lru::LruCache; use near_primitives::network::PeerId; +use parking_lot::Mutex; use rayon::iter::ParallelBridge; use std::collections::HashMap; use std::sync::Arc; @@ -23,16 +24,23 @@ pub(crate) enum SnapshotHostInfoError { DuplicatePeerId, } -/// TODO(saketh): Introduce a cache size limit #[derive(Clone)] +pub struct Config { + /// The maximum number of SnapshotHostInfos to store locally. + /// At present this constraint is enforced using a simple + /// least-recently-used cache. In the future, we may wish to + /// implement something more sophisticated. + pub snapshot_hosts_cache_size: u32, +} + struct Inner { /// The latest known SnapshotHostInfo for each node in the network - hosts: im::HashMap>, + hosts: LruCache>, } impl Inner { fn is_new(&self, h: &SnapshotHostInfo) -> bool { - match self.hosts.get(&h.peer_id) { + match self.hosts.peek(&h.peer_id) { Some(old) if old.epoch_height >= h.epoch_height => false, _ => true, } @@ -45,16 +53,17 @@ impl Inner { if !self.is_new(&d) { return None; } - self.hosts.insert(d.peer_id.clone(), d.clone()); + self.hosts.push(d.peer_id.clone(), d.clone()); Some(d) } } -pub(crate) struct SnapshotHostsCache(ArcMutex); +pub(crate) struct SnapshotHostsCache(Mutex); impl SnapshotHostsCache { - pub fn new() -> Self { - Self(ArcMutex::new(Inner { hosts: im::HashMap::new() })) + pub fn new(config: Config) -> Self { + let hosts = LruCache::new(config.snapshot_hosts_cache_size as usize); + Self(Mutex::new(Inner { hosts })) } /// Selects new data and verifies the signatures. @@ -66,17 +75,19 @@ impl SnapshotHostsCache { ) -> (Vec>, Option) { // Filter out any data which is outdated or which we already have. let mut new_data = HashMap::new(); - let inner = self.0.load(); - for d in data { - // Sharing multiple entries for the same peer is considered malicious, - // since all but one are obviously outdated. - if new_data.contains_key(&d.peer_id) { - return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId)); - } - // It is fine to broadcast data we already know about. - // It is fine to broadcast data which we know to be outdated. - if inner.is_new(&d) { - new_data.insert(d.peer_id.clone(), d); + { + let inner = self.0.lock(); + for d in data { + // Sharing multiple entries for the same peer is considered malicious, + // since all but one are obviously outdated. + if new_data.contains_key(&d.peer_id) { + return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId)); + } + // It is fine to broadcast data we already know about. + // It is fine to broadcast data which we know to be outdated. + if inner.is_new(&d) { + new_data.insert(d.peer_id.clone(), d); + } } } @@ -99,22 +110,24 @@ impl SnapshotHostsCache { /// Returns the data inserted and optionally a verification error. /// WriteLock is acquired only for the final update (after verification). pub async fn insert( - self: &Arc, + self: &Self, data: Vec>, ) -> (Vec>, Option) { - let this = self.clone(); // Execute verification on the rayon threadpool. - let (data, err) = this.verify(data).await; + let (data, err) = self.verify(data).await; // Insert the successfully verified data, even if an error has been encountered. - let inserted = self.0.update(|mut inner| { - let inserted = data.into_iter().filter_map(|d| inner.try_insert(d)).collect(); - (inserted, inner) - }); + let mut newly_inserted_data: Vec> = vec![]; + let mut inner = self.0.lock(); + for d in data { + if let Some(inserted) = inner.try_insert(d) { + newly_inserted_data.push(inserted); + } + } // Return the inserted data. - (inserted, err) + (newly_inserted_data, err) } pub fn get_hosts(&self) -> Vec> { - self.0.load().hosts.values().cloned().collect() + self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect() } } diff --git a/chain/network/src/snapshot_hosts/tests.rs b/chain/network/src/snapshot_hosts/tests.rs index d0f64f6face..a36e859e47e 100644 --- a/chain/network/src/snapshot_hosts/tests.rs +++ b/chain/network/src/snapshot_hosts/tests.rs @@ -1,5 +1,5 @@ use crate::network_protocol::testonly as data; -use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache}; +use crate::snapshot_hosts::{Config, SnapshotHostInfoError, SnapshotHostsCache}; use crate::testonly::assert_is_superset; use crate::testonly::{make_rng, AsSet as _}; use crate::types::SnapshotHostInfo; @@ -45,7 +45,8 @@ async fn happy_path() { let peer1 = PeerId::new(key1.public_key()); let peer2 = PeerId::new(key2.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); assert_eq!(cache.get_hosts().len(), 0); // initially empty // initial insert @@ -79,7 +80,9 @@ async fn invalid_signature() { let peer0 = PeerId::new(key0.public_key()); let peer1 = PeerId::new(key1.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); + let info0_invalid_sig = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key1)); let info1 = Arc::new(make_snapshot_host_info(&peer1, 1, vec![0, 1, 2, 3], &key1)); let res = cache.insert(vec![info0_invalid_sig.clone(), info1.clone()]).await; @@ -102,7 +105,8 @@ async fn duplicate_peer_id() { let key0 = data::make_secret_key(rng); let peer0 = PeerId::new(key0.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); let info00 = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key0)); let info01 = Arc::new(make_snapshot_host_info(&peer0, 2, vec![0, 3], &key0)); @@ -113,3 +117,40 @@ async fn duplicate_peer_id() { // no partial data is stored assert_eq!(0, cache.get_hosts().len()); } + +#[tokio::test] +async fn test_lru_eviction() { + init_test_logger(); + let mut rng = make_rng(2947294234); + let rng = &mut rng; + + let key0 = data::make_secret_key(rng); + let key1 = data::make_secret_key(rng); + let key2 = data::make_secret_key(rng); + + let peer0 = PeerId::new(key0.public_key()); + let peer1 = PeerId::new(key1.public_key()); + let peer2 = PeerId::new(key2.public_key()); + + let config = Config { snapshot_hosts_cache_size: 2 }; + let cache = SnapshotHostsCache::new(config); + + // initial inserts to capacity + let info0 = Arc::new(make_snapshot_host_info(&peer0, 123, vec![0, 1, 2, 3], &key0)); + let res = cache.insert(vec![info0.clone()]).await; + assert_eq!([&info0].as_set(), unwrap(&res).as_set()); + assert_eq!([&info0].as_set(), cache.get_hosts().iter().collect::>()); + + let info1 = Arc::new(make_snapshot_host_info(&peer1, 123, vec![2], &key1)); + let res = cache.insert(vec![info1.clone()]).await; + assert_eq!([&info1].as_set(), unwrap(&res).as_set()); + assert_eq!([&info0, &info1].as_set(), cache.get_hosts().iter().collect::>()); + + // insert past capacity + let info2 = Arc::new(make_snapshot_host_info(&peer2, 123, vec![1, 3], &key2)); + let res = cache.insert(vec![info2.clone()]).await; + // check that the new data is accepted + assert_eq!([&info2].as_set(), unwrap(&res).as_set()); + // check that the oldest data was evicted + assert_eq!([&info1, &info2].as_set(), cache.get_hosts().iter().collect::>()); +}