diff --git a/Cargo.lock b/Cargo.lock index 31653de..420ce89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -717,6 +717,7 @@ dependencies = [ "dashmap", "env_logger", "golomb-coded-set", + "governor", "jsonrpc-core", "jsonrpc-derive", "jsonrpc-http-server", @@ -1731,6 +1732,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.28" @@ -1839,6 +1846,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if 1.0.0", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2352,6 +2379,12 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -2368,6 +2401,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "num_cpus" version = "1.16.0" @@ -2644,6 +2683,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2735,6 +2780,21 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.35" @@ -2931,6 +2991,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "raw-cpuid" +version = "11.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd" +dependencies = [ + "bitflags 2.4.1", +] + [[package]] name = "rayon" version = "1.8.0" @@ -3292,6 +3361,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "ssri" version = "9.2.0" diff --git a/Cargo.toml b/Cargo.toml index 41eb9da..b35c0cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ jsonrpc-core = "18.0" jsonrpc-derive = "18.0" jsonrpc-http-server = "18.0" jsonrpc-server-utils = "18.0" +governor = "0.6.3" [dev-dependencies] ckb-shared = "0.113.0" diff --git a/src/protocols/filter/block_filter.rs b/src/protocols/filter/block_filter.rs index 9dec278..f4c2387 100644 --- a/src/protocols/filter/block_filter.rs +++ b/src/protocols/filter/block_filter.rs @@ -370,7 +370,7 @@ impl CKBProtocolHandler for FilterProtocol { let item_name = msg.item_name(); let status = self.try_process(Arc::clone(&nc), peer, msg); - status.process(nc, peer, "BlockFilter", item_name); + status.process(nc, &self.peers, peer, "BlockFilter", item_name); } async fn notify(&mut self, nc: Arc, token: u64) { diff --git a/src/protocols/light_client/mod.rs b/src/protocols/light_client/mod.rs index a36f1ea..5854ab9 100644 --- a/src/protocols/light_client/mod.rs +++ b/src/protocols/light_client/mod.rs @@ -129,7 +129,7 @@ impl CKBProtocolHandler for LightClientProtocol { let item_name = msg.item_name(); let status = self.try_process(nc.as_ref(), peer_index, msg); - status.process(nc, peer_index, "LightClient", item_name); + status.process(nc, self.peers(), peer_index, "LightClient", item_name); } async fn notify(&mut self, nc: Arc, token: u64) { diff --git a/src/protocols/light_client/peers.rs b/src/protocols/light_client/peers.rs index 29aae4d..0274f40 100644 --- a/src/protocols/light_client/peers.rs +++ b/src/protocols/light_client/peers.rs @@ -1,3 +1,10 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt, mem, + num::NonZeroU32, + sync::{Mutex, RwLock}, +}; + use ckb_network::PeerIndex; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ @@ -9,14 +16,12 @@ use ckb_types::{ H256, U256, }; use dashmap::DashMap; -use std::{ - collections::{HashMap, HashSet}, - fmt, mem, - sync::RwLock, -}; +use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; use super::prelude::*; -use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT}; +use crate::protocols::{Status, StatusCode, BAD_MESSAGE_ALLOWED_EACH_HOUR, MESSAGE_TIMEOUT}; + +pub type BadMessageRateLimiter = RateLimiter, DefaultClock>; pub struct Peers { inner: DashMap, @@ -45,6 +50,10 @@ pub struct Peers { check_point_interval: BlockNumber, start_check_point: (u32, packed::Byte32), + + rate_limiter: Mutex>, + #[cfg(test)] + bad_message_allowed_each_hour: u32, } #[derive(Clone)] @@ -1117,10 +1126,22 @@ impl Peers { max_outbound_peers: u32, check_point_interval: BlockNumber, start_check_point: (u32, packed::Byte32), + bad_message_allowed_each_hour: u32, ) -> Self { #[cfg(test)] let max_outbound_peers = RwLock::new(max_outbound_peers); + let rate_limiter = { + let limit = if bad_message_allowed_each_hour == 0 { + BAD_MESSAGE_ALLOWED_EACH_HOUR + } else { + bad_message_allowed_each_hour + }; + let max_burst = unsafe { NonZeroU32::new_unchecked(limit) }; + let quota = Quota::per_hour(max_burst); + Mutex::new(RateLimiter::keyed(quota)) + }; + Self { inner: Default::default(), fetching_headers: DashMap::new(), @@ -1130,6 +1151,9 @@ impl Peers { max_outbound_peers, check_point_interval, start_check_point, + rate_limiter, + #[cfg(test)] + bad_message_allowed_each_hour, } } @@ -1272,6 +1296,7 @@ impl Peers { self.mark_fetching_headers_timeout(index); self.mark_fetching_txs_timeout(index); self.inner.remove(&index); + let _ignore_error = self.rate_limiter.lock().map(|inner| inner.retain_recent()); } pub(crate) fn get_peers_index(&self) -> Vec { @@ -1965,6 +1990,18 @@ impl Peers { .map(|(peer_index, _)| peer_index) .collect() } + + pub(crate) fn should_ban(&self, peer_index: PeerIndex) -> bool { + #[cfg(test)] + if self.bad_message_allowed_each_hour == 0 { + return true; + } + self.rate_limiter + .lock() + .map_err(|_| ()) + .and_then(|inner| inner.check_key(&peer_index).map_err(|_| ())) + .is_err() + } } fn if_verifiable_headers_are_same(lhs: &VerifiableHeader, rhs: &VerifiableHeader) -> bool { diff --git a/src/protocols/mod.rs b/src/protocols/mod.rs index fbdb11d..702c3aa 100644 --- a/src/protocols/mod.rs +++ b/src/protocols/mod.rs @@ -21,7 +21,10 @@ pub(crate) use relayer::{PendingTxs, RelayProtocol}; pub(crate) use status::{Status, StatusCode}; pub(crate) use synchronizer::SyncProtocol; +// The period to ban a peer for bad messages. pub const BAD_MESSAGE_BAN_TIME: Duration = Duration::from_secs(5 * 60); +// Ban a peer if unexpected responses from that peer reach the limit. +pub const BAD_MESSAGE_ALLOWED_EACH_HOUR: u32 = 10; // Ban a peer if it reach any timeout. pub const MESSAGE_TIMEOUT: u64 = 60 * 1000; diff --git a/src/protocols/status.rs b/src/protocols/status.rs index 5e7d275..cd2dc3d 100644 --- a/src/protocols/status.rs +++ b/src/protocols/status.rs @@ -3,7 +3,7 @@ use std::{fmt, sync::Arc, time::Duration}; use ckb_network::{CKBProtocolContext, PeerIndex}; use log::{debug, error, trace, warn}; -use super::BAD_MESSAGE_BAN_TIME; +use super::{Peers, BAD_MESSAGE_BAN_TIME}; /// StatusCodes indicate whether a specific operation has been successfully completed. /// @@ -157,10 +157,15 @@ impl Status { } /// Whether the session should be banned. - pub fn should_ban(&self) -> Option { + pub fn should_ban(&self, peers: &Peers, index: PeerIndex) -> Option { let code = self.code() as u16; if (400..500).contains(&code) { - Some(BAD_MESSAGE_BAN_TIME) + // TODO Resort the error codes, let malformed messages, which lead to be banned directly, to be together. + if code == 400 || peers.should_ban(index) { + Some(BAD_MESSAGE_BAN_TIME) + } else { + None + } } else { None } @@ -180,11 +185,12 @@ impl Status { pub fn process( &self, nc: Arc, + peers: &Peers, index: PeerIndex, protocol: &str, message: &str, ) { - if let Some(ban_time) = self.should_ban() { + if let Some(ban_time) = self.should_ban(peers, index) { error!( "{}Protocol.received {} from {}, result {}, ban {:?}", protocol, message, index, self, ban_time diff --git a/src/subcmds.rs b/src/subcmds.rs index c0ce650..801890e 100644 --- a/src/subcmds.rs +++ b/src/subcmds.rs @@ -14,7 +14,7 @@ use crate::{ error::{Error, Result}, protocols::{ FilterProtocol, LightClientProtocol, Peers, PendingTxs, RelayProtocol, SyncProtocol, - CHECK_POINT_INTERVAL, + BAD_MESSAGE_ALLOWED_EACH_HOUR, CHECK_POINT_INTERVAL, }, service::Service, storage::Storage, @@ -65,6 +65,7 @@ impl RunConfig { max_outbound_peers, CHECK_POINT_INTERVAL, storage.get_last_check_point(), + BAD_MESSAGE_ALLOWED_EACH_HOUR, )); let sync_protocol = SyncProtocol::new(storage.clone(), Arc::clone(&peers)); let relay_protocol_v2 = RelayProtocol::new( diff --git a/src/tests/prelude.rs b/src/tests/prelude.rs index fdf954d..b16eabf 100644 --- a/src/tests/prelude.rs +++ b/src/tests/prelude.rs @@ -166,11 +166,17 @@ pub(crate) trait ChainExt { fn consensus(&self) -> &Consensus; fn create_peers(&self) -> Arc { + let bad_message_allowed_each_hour = 0; + self.create_peers_with_parameters(bad_message_allowed_each_hour) + } + + fn create_peers_with_parameters(&self, bad_message_allowed_each_hour: u32) -> Arc { let max_outbound_peers = 1; let peers = Peers::new( max_outbound_peers, CHECK_POINT_INTERVAL, self.client_storage().get_last_check_point(), + bad_message_allowed_each_hour, ); Arc::new(peers) } diff --git a/src/tests/protocols/block_filter.rs b/src/tests/protocols/block_filter.rs index 1532bbb..d273ae7 100644 --- a/src/tests/protocols/block_filter.rs +++ b/src/tests/protocols/block_filter.rs @@ -81,7 +81,6 @@ async fn test_block_filter_ignore_start_number() { .set(content) .build(); - let peer_index = PeerIndex::new(3); protocol .received(nc.context(), peer_index, message.as_bytes()) .await; @@ -131,7 +130,6 @@ async fn test_block_filter_empty_filters() { .set(content) .build(); - let peer_index = PeerIndex::new(3); protocol .received(nc.context(), peer_index, message.as_bytes()) .await; @@ -144,6 +142,7 @@ async fn test_block_filter_empty_filters() { async fn test_block_filter_invalid_filters_count() { let chain = MockChain::new_with_dummy_pow("test-block-filter"); let nc = MockNetworkContext::new(SupportProtocols::Filter); + let bad_message_allowed_each_hour = 5; let min_filtered_block_number = 3; chain.client_storage().update_filter_scripts( @@ -166,7 +165,7 @@ async fn test_block_filter_invalid_filters_count() { None, Default::default(), ); - let peers = chain.create_peers(); + let peers = chain.create_peers_with_parameters(bad_message_allowed_each_hour); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -181,11 +180,9 @@ async fn test_block_filter_invalid_filters_count() { .set(content) .build(); - let peer_index = PeerIndex::new(3); protocol .received(nc.context(), peer_index, message.as_bytes()) .await; - assert_eq!( nc.has_banned(peer_index).map(|(duration, _)| duration), Some(BAD_MESSAGE_BAN_TIME) diff --git a/src/tests/protocols/light_client/send_last_state.rs b/src/tests/protocols/light_client/send_last_state.rs index 6799d34..215c59e 100644 --- a/src/tests/protocols/light_client/send_last_state.rs +++ b/src/tests/protocols/light_client/send_last_state.rs @@ -69,8 +69,9 @@ async fn invalid_chain_root() { let nc = MockNetworkContext::new(SupportProtocols::LightClient); let peer_index = PeerIndex::new(1); + let bad_message_allowed_each_hour = 5; let peers = { - let peers = chain.create_peers(); + let peers = chain.create_peers_with_parameters(bad_message_allowed_each_hour); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -94,8 +95,14 @@ async fn invalid_chain_root() { } .as_bytes(); - protocol.received(nc.context(), peer_index, data).await; + for _ in 0..bad_message_allowed_each_hour { + protocol + .received(nc.context(), peer_index, data.clone()) + .await; + assert!(nc.not_banned(peer_index)); + } + protocol.received(nc.context(), peer_index, data).await; assert!(nc.banned_since(peer_index, StatusCode::InvalidChainRoot)); } diff --git a/src/tests/utils/mod.rs b/src/tests/utils/mod.rs index 4caac6b..6b2f072 100644 --- a/src/tests/utils/mod.rs +++ b/src/tests/utils/mod.rs @@ -28,10 +28,12 @@ pub(crate) fn new_storage(prefix: &str) -> Storage { pub(crate) fn create_peers() -> Arc { let max_outbound_peers = 1; + let bad_message_allowed_each_hour = 0; let peers = Peers::new( max_outbound_peers, CHECK_POINT_INTERVAL, (0, Default::default()), + bad_message_allowed_each_hour, ); Arc::new(peers) }