Skip to content

Commit

Permalink
Bound the size of SnapshotHostsCache (#10133)
Browse files Browse the repository at this point in the history
This PR configures a hard limit on the size of SnapshotHostsCache. The
internal storage is replaced with `lru::LruCache`, which implements a
simple least-recently-used eviction policy.

In the short term, this will prevent memory exhaustion via unbounded
addition of entries to the cache.

In the long term, we may wish to revisit and implement a more
sophisticated eviction policy, especially after we implement a
reputation system for peers sharing state parts.
  • Loading branch information
saketh-are authored Nov 9, 2023
1 parent cc77683 commit e77de24
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 33 deletions.
6 changes: 6 additions & 0 deletions chain/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::network_protocol::PeerInfo;
use crate::peer_manager::peer_manager_actor::Event;
use crate::peer_manager::peer_store;
use crate::sink::Sink;
use crate::snapshot_hosts;
use crate::stun;
use crate::tcp;
use crate::types::ROUTED_MESSAGE_TTL;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub struct NetworkConfig {
pub validator: Option<ValidatorConfig>,

pub peer_store: peer_store::Config,
pub snapshot_hosts: snapshot_hosts::Config,
pub whitelist_nodes: Vec<PeerInfo>,
pub handshake_timeout: time::Duration,

Expand Down Expand Up @@ -285,6 +287,9 @@ impl NetworkConfig {
ban_window: cfg.ban_window.try_into()?,
peer_expiration_duration: cfg.peer_expiration_duration.try_into()?,
},
snapshot_hosts: snapshot_hosts::Config {
snapshot_hosts_cache_size: cfg.snapshot_hosts_cache_size,
},
whitelist_nodes: if cfg.whitelist_nodes.is_empty() {
vec![]
} else {
Expand Down Expand Up @@ -367,6 +372,7 @@ impl NetworkConfig {
peer_expiration_duration: time::Duration::seconds(60 * 60),
connect_only_to_boot_nodes: false,
},
snapshot_hosts: snapshot_hosts::Config { snapshot_hosts_cache_size: 1000 },
whitelist_nodes: vec![],
handshake_timeout: time::Duration::seconds(5),
connect_to_reliable_peers_on_startup: true,
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/config_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ fn default_monitor_peers_max_period() -> Duration {
fn default_peer_states_cache_size() -> u32 {
1000
}
/// Maximum number of snapshot hosts to keep in memory.
fn default_snapshot_hosts_cache_size() -> u32 {
1000
}
/// Remove peers that we didn't hear about for this amount of time.
fn default_peer_expiration_duration() -> Duration {
Duration::from_secs(7 * 24 * 60 * 60)
Expand Down Expand Up @@ -139,6 +143,9 @@ pub struct Config {
/// Maximum number of peer states to keep in memory.
#[serde(default = "default_peer_states_cache_size")]
pub peer_states_cache_size: u32,
/// Maximum number of snapshot hosts to keep in memory.
#[serde(default = "default_snapshot_hosts_cache_size")]
pub snapshot_hosts_cache_size: u32,
// Remove peers that were not active for this amount of time.
#[serde(default = "default_peer_expiration_duration")]
pub peer_expiration_duration: Duration,
Expand Down Expand Up @@ -296,6 +303,7 @@ impl Default for Config {
handshake_timeout: Duration::from_secs(20),
skip_sync_wait: false,
peer_states_cache_size: default_peer_states_cache_size(),
snapshot_hosts_cache_size: default_snapshot_hosts_cache_size(),
ban_window: Duration::from_secs(3 * 60 * 60),
blacklist: vec![],
ttl_account_id_router: default_ttl_account_id_router(),
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl NetworkState {
tier1: connection::Pool::new(config.node_id()),
inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)),
peer_store,
snapshot_hosts: Arc::new(SnapshotHostsCache::new()),
snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())),
connection_store: connection_store::ConnectionStore::new(store).unwrap(),
pending_reconnect: Mutex::new(Vec::<PeerInfo>::new()),
accounts_data: Arc::new(AccountDataCache::new()),
Expand Down
69 changes: 41 additions & 28 deletions chain/network/src/snapshot_hosts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
//! in the network and stored locally in this cache.
use crate::concurrency;
use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol::SnapshotHostInfo;
use lru::LruCache;
use near_primitives::network::PeerId;
use parking_lot::Mutex;
use rayon::iter::ParallelBridge;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -23,16 +24,23 @@ pub(crate) enum SnapshotHostInfoError {
DuplicatePeerId,
}

/// TODO(saketh): Introduce a cache size limit
#[derive(Clone)]
pub struct Config {
/// The maximum number of SnapshotHostInfos to store locally.
/// At present this constraint is enforced using a simple
/// least-recently-used cache. In the future, we may wish to
/// implement something more sophisticated.
pub snapshot_hosts_cache_size: u32,
}

struct Inner {
/// The latest known SnapshotHostInfo for each node in the network
hosts: im::HashMap<PeerId, Arc<SnapshotHostInfo>>,
hosts: LruCache<PeerId, Arc<SnapshotHostInfo>>,
}

impl Inner {
fn is_new(&self, h: &SnapshotHostInfo) -> bool {
match self.hosts.get(&h.peer_id) {
match self.hosts.peek(&h.peer_id) {
Some(old) if old.epoch_height >= h.epoch_height => false,
_ => true,
}
Expand All @@ -45,16 +53,17 @@ impl Inner {
if !self.is_new(&d) {
return None;
}
self.hosts.insert(d.peer_id.clone(), d.clone());
self.hosts.push(d.peer_id.clone(), d.clone());
Some(d)
}
}

pub(crate) struct SnapshotHostsCache(ArcMutex<Inner>);
pub(crate) struct SnapshotHostsCache(Mutex<Inner>);

impl SnapshotHostsCache {
pub fn new() -> Self {
Self(ArcMutex::new(Inner { hosts: im::HashMap::new() }))
pub fn new(config: Config) -> Self {
let hosts = LruCache::new(config.snapshot_hosts_cache_size as usize);
Self(Mutex::new(Inner { hosts }))
}

/// Selects new data and verifies the signatures.
Expand All @@ -66,17 +75,19 @@ impl SnapshotHostsCache {
) -> (Vec<Arc<SnapshotHostInfo>>, Option<SnapshotHostInfoError>) {
// Filter out any data which is outdated or which we already have.
let mut new_data = HashMap::new();
let inner = self.0.load();
for d in data {
// Sharing multiple entries for the same peer is considered malicious,
// since all but one are obviously outdated.
if new_data.contains_key(&d.peer_id) {
return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId));
}
// It is fine to broadcast data we already know about.
// It is fine to broadcast data which we know to be outdated.
if inner.is_new(&d) {
new_data.insert(d.peer_id.clone(), d);
{
let inner = self.0.lock();
for d in data {
// Sharing multiple entries for the same peer is considered malicious,
// since all but one are obviously outdated.
if new_data.contains_key(&d.peer_id) {
return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId));
}
// It is fine to broadcast data we already know about.
// It is fine to broadcast data which we know to be outdated.
if inner.is_new(&d) {
new_data.insert(d.peer_id.clone(), d);
}
}
}

Expand All @@ -99,22 +110,24 @@ impl SnapshotHostsCache {
/// Returns the data inserted and optionally a verification error.
/// WriteLock is acquired only for the final update (after verification).
pub async fn insert(
self: &Arc<Self>,
self: &Self,
data: Vec<Arc<SnapshotHostInfo>>,
) -> (Vec<Arc<SnapshotHostInfo>>, Option<SnapshotHostInfoError>) {
let this = self.clone();
// Execute verification on the rayon threadpool.
let (data, err) = this.verify(data).await;
let (data, err) = self.verify(data).await;
// Insert the successfully verified data, even if an error has been encountered.
let inserted = self.0.update(|mut inner| {
let inserted = data.into_iter().filter_map(|d| inner.try_insert(d)).collect();
(inserted, inner)
});
let mut newly_inserted_data: Vec<Arc<SnapshotHostInfo>> = vec![];
let mut inner = self.0.lock();
for d in data {
if let Some(inserted) = inner.try_insert(d) {
newly_inserted_data.push(inserted);
}
}
// Return the inserted data.
(inserted, err)
(newly_inserted_data, err)
}

pub fn get_hosts(&self) -> Vec<Arc<SnapshotHostInfo>> {
self.0.load().hosts.values().cloned().collect()
self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect()
}
}
49 changes: 45 additions & 4 deletions chain/network/src/snapshot_hosts/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::network_protocol::testonly as data;
use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache};
use crate::snapshot_hosts::{Config, SnapshotHostInfoError, SnapshotHostsCache};
use crate::testonly::assert_is_superset;
use crate::testonly::{make_rng, AsSet as _};
use crate::types::SnapshotHostInfo;
Expand Down Expand Up @@ -45,7 +45,8 @@ async fn happy_path() {
let peer1 = PeerId::new(key1.public_key());
let peer2 = PeerId::new(key2.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);
assert_eq!(cache.get_hosts().len(), 0); // initially empty

// initial insert
Expand Down Expand Up @@ -79,7 +80,9 @@ async fn invalid_signature() {
let peer0 = PeerId::new(key0.public_key());
let peer1 = PeerId::new(key1.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);

let info0_invalid_sig = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key1));
let info1 = Arc::new(make_snapshot_host_info(&peer1, 1, vec![0, 1, 2, 3], &key1));
let res = cache.insert(vec![info0_invalid_sig.clone(), info1.clone()]).await;
Expand All @@ -102,7 +105,8 @@ async fn duplicate_peer_id() {
let key0 = data::make_secret_key(rng);
let peer0 = PeerId::new(key0.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);

let info00 = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key0));
let info01 = Arc::new(make_snapshot_host_info(&peer0, 2, vec![0, 3], &key0));
Expand All @@ -113,3 +117,40 @@ async fn duplicate_peer_id() {
// no partial data is stored
assert_eq!(0, cache.get_hosts().len());
}

#[tokio::test]
async fn test_lru_eviction() {
init_test_logger();
let mut rng = make_rng(2947294234);
let rng = &mut rng;

let key0 = data::make_secret_key(rng);
let key1 = data::make_secret_key(rng);
let key2 = data::make_secret_key(rng);

let peer0 = PeerId::new(key0.public_key());
let peer1 = PeerId::new(key1.public_key());
let peer2 = PeerId::new(key2.public_key());

let config = Config { snapshot_hosts_cache_size: 2 };
let cache = SnapshotHostsCache::new(config);

// initial inserts to capacity
let info0 = Arc::new(make_snapshot_host_info(&peer0, 123, vec![0, 1, 2, 3], &key0));
let res = cache.insert(vec![info0.clone()]).await;
assert_eq!([&info0].as_set(), unwrap(&res).as_set());
assert_eq!([&info0].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());

let info1 = Arc::new(make_snapshot_host_info(&peer1, 123, vec![2], &key1));
let res = cache.insert(vec![info1.clone()]).await;
assert_eq!([&info1].as_set(), unwrap(&res).as_set());
assert_eq!([&info0, &info1].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());

// insert past capacity
let info2 = Arc::new(make_snapshot_host_info(&peer2, 123, vec![1, 3], &key2));
let res = cache.insert(vec![info2.clone()]).await;
// check that the new data is accepted
assert_eq!([&info2].as_set(), unwrap(&res).as_set());
// check that the oldest data was evicted
assert_eq!([&info1, &info2].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());
}

0 comments on commit e77de24

Please sign in to comment.