From 8b168cd00a20e27cd0c3b076eddba5a6a63f3bd4 Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:23:53 -0700 Subject: [PATCH] Implement basic tx throttling on high P2P load When P2P load is high: 1. Limit the number of peers to broadcast to 2. Limit the number of transactions requested when requesting missing transactions 3. Reduce the size of the invs channel and allow dropping invs when it's full --- Cargo.lock | 1 + mining/src/manager.rs | 6 +- protocol/flows/src/flow_context.rs | 10 +-- .../flows/src/flowcontext/transactions.rs | 12 ++-- protocol/flows/src/v5/txrelay/flow.rs | 62 ++++++++++++++++--- protocol/p2p/Cargo.toml | 1 + protocol/p2p/src/core/hub.rs | 16 +++++ 7 files changed, 90 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65cbcc92f..7c795a9e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2741,6 +2741,7 @@ dependencies = [ "log", "parking_lot", "prost", + "rand 0.8.5", "seqlock", "serde", "thiserror", diff --git a/mining/src/manager.rs b/mining/src/manager.rs index ccd669718..bc6104e1a 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -17,7 +17,7 @@ use crate::{ topological_sort::IntoIterTopologically, tx_query::TransactionQuery, }, - MiningCounters, + MempoolCountersSnapshot, MiningCounters, }; use itertools::Itertools; use kaspa_consensus_core::{ @@ -873,4 +873,8 @@ impl MiningManagerProxy { pub async fn unknown_transactions(self, transactions: Vec) -> Vec { spawn_blocking(move || self.inner.unknown_transactions(transactions)).await.unwrap() } + + pub fn snapshot(&self) -> MempoolCountersSnapshot { + self.inner.counters.snapshot() + } } diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 23daad29f..4be955996 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -560,7 +560,7 @@ impl FlowContext { return; } - self.broadcast_transactions(transactions_to_broadcast).await; + self.broadcast_transactions(transactions_to_broadcast, false).await; if self.should_run_mempool_scanning_task().await { // Spawn a task executing the removal of expired low priority transactions and, if time has come too, @@ -580,7 +580,7 @@ impl FlowContext { mining_manager.revalidate_high_priority_transactions(&consensus_clone, tx).await; }); while let Some(transactions) = rx.recv().await { - let _ = context.broadcast_transactions(transactions).await; + let _ = context.broadcast_transactions(transactions, false).await; } } context.mempool_scanning_is_done().await; @@ -614,7 +614,7 @@ impl FlowContext { ) -> Result<(), ProtocolError> { let accepted_transactions = self.mining_manager().clone().validate_and_insert_transaction(consensus, transaction, Priority::High, orphan).await?; - self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id())).await; + self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id()), false).await; Ok(()) } @@ -641,8 +641,8 @@ impl FlowContext { /// /// The broadcast itself may happen only during a subsequent call to this function since it is done at most /// after a predefined interval or when the queue length is larger than the Inv message capacity. - pub async fn broadcast_transactions>(&self, transaction_ids: I) { - self.transactions_spread.write().await.broadcast_transactions(transaction_ids).await + pub async fn broadcast_transactions>(&self, transaction_ids: I, should_throttle: bool) { + self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await } } diff --git a/protocol/flows/src/flowcontext/transactions.rs b/protocol/flows/src/flowcontext/transactions.rs index a19c0b95f..0b4eadfb7 100644 --- a/protocol/flows/src/flowcontext/transactions.rs +++ b/protocol/flows/src/flowcontext/transactions.rs @@ -77,7 +77,7 @@ impl TransactionsSpread { /// capacity. /// /// _GO-KASPAD: EnqueueTransactionIDsForPropagation_ - pub async fn broadcast_transactions>(&mut self, transaction_ids: I) { + pub async fn broadcast_transactions>(&mut self, transaction_ids: I, should_throttle: bool) { self.transaction_ids.enqueue_chunk(transaction_ids); let now = Instant::now(); @@ -89,13 +89,17 @@ impl TransactionsSpread { let ids = self.transaction_ids.dequeue_chunk(MAX_INV_PER_TX_INV_MSG).map(|x| x.into()).collect_vec(); debug!("Transaction propagation: broadcasting {} transactions", ids.len()); let msg = make_message!(Payload::InvTransactions, InvTransactionsMessage { ids }); - self.broadcast(msg).await; + self.broadcast(msg, should_throttle).await; } self.last_broadcast_time = Instant::now(); } - async fn broadcast(&self, msg: KaspadMessage) { - self.hub.broadcast(msg).await + async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) { + if should_throttle { + self.hub.broadcast_some(msg, 0.5).await + } else { + self.hub.broadcast(msg).await + } } } diff --git a/protocol/flows/src/v5/txrelay/flow.rs b/protocol/flows/src/v5/txrelay/flow.rs index 44c4860f2..ea10f3dcc 100644 --- a/protocol/flows/src/v5/txrelay/flow.rs +++ b/protocol/flows/src/v5/txrelay/flow.rs @@ -5,6 +5,7 @@ use crate::{ }; use kaspa_consensus_core::tx::{Transaction, TransactionId}; use kaspa_consensusmanager::ConsensusProxy; +use kaspa_core::{time::unix_now, warn}; use kaspa_mining::{ errors::MiningManagerError, mempool::{ @@ -12,6 +13,7 @@ use kaspa_mining::{ tx::{Orphan, Priority}, }, model::tx_query::TransactionQuery, + MempoolCountersSnapshot, }; use kaspa_p2p_lib::{ common::{ProtocolError, DEFAULT_TIMEOUT}, @@ -22,6 +24,8 @@ use kaspa_p2p_lib::{ use std::sync::Arc; use tokio::time::timeout; +pub(crate) const MAX_TPS_THRESHOLD: u64 = 3000; + enum Response { Transaction(Transaction), NotFound(TransactionId), @@ -69,7 +73,7 @@ impl RelayTransactionsFlow { pub fn invs_channel_size() -> usize { // TODO: reevaluate when the node is fully functional and later when the network tx rate increases // Note: in go-kaspad we have 10,000 for this channel combined with tx channel. - 8192 + 4096 } pub fn txs_channel_size() -> usize { @@ -80,7 +84,30 @@ impl RelayTransactionsFlow { async fn start_impl(&mut self) -> Result<(), ProtocolError> { // trace!("Starting relay transactions flow with {}", self.router.identity()); + let mut last_checked_time = unix_now(); + let mut curr_snapshot = self.ctx.mining_manager().clone().snapshot(); + let mut should_throttle = false; + loop { + let now = unix_now(); + if now - last_checked_time > 10000 { + let next_snapshot = self.ctx.mining_manager().clone().snapshot(); + let snapshot_delta = &next_snapshot - &curr_snapshot; + + last_checked_time = now; + curr_snapshot = next_snapshot; + + if snapshot_delta.low_priority_tx_counts > 0 { + let tps = snapshot_delta.low_priority_tx_counts / 10; + if tps > MAX_TPS_THRESHOLD { + warn!("P2P tx relay threshold exceeded. Throttling relay. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD); + should_throttle = true; + } else if tps < MAX_TPS_THRESHOLD / 2 && should_throttle { + warn!("P2P tx relay threshold back to normal. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD); + should_throttle = false; + } + } + } // Loop over incoming block inv messages let inv: Vec = dequeue!(self.invs_route, Payload::InvTransactions)?.try_into()?; // trace!("Receive an inv message from {} with {} transaction ids", self.router.identity(), inv.len()); @@ -96,28 +123,43 @@ impl RelayTransactionsFlow { continue; } - let requests = self.request_transactions(inv).await?; - self.receive_transactions(session, requests).await?; + let requests = self.request_transactions(inv, should_throttle, &curr_snapshot).await?; + self.receive_transactions(session, requests, should_throttle).await?; } } async fn request_transactions( &self, transaction_ids: Vec, + should_throttle: bool, + curr_snapshot: &MempoolCountersSnapshot, ) -> Result>, ProtocolError> { // Build a vector with the transaction ids unknown in the mempool and not already requested // by another peer let transaction_ids = self.ctx.mining_manager().clone().unknown_transactions(transaction_ids).await; let mut requests = Vec::new(); + let snapshot_delta = curr_snapshot - &self.ctx.mining_manager().clone().snapshot(); + + // To reduce the P2P TPS to below the threshold, we need to request up to a max of + // whatever the balances overage. If MAX_TPS_THRESHOLD is 3000 and the current TPS is 4000, + // then we can only request up to 2000 (MAX - (4000 - 3000)) to average out into the threshold. + let curr_p2p_tps = snapshot_delta.low_priority_tx_counts / (snapshot_delta.elapsed_time.as_millis().max(1) as u64); + let overage = if should_throttle && curr_p2p_tps > MAX_TPS_THRESHOLD { curr_p2p_tps - MAX_TPS_THRESHOLD } else { 0 }; + + let limit = MAX_TPS_THRESHOLD - overage; + for transaction_id in transaction_ids { if let Some(req) = self.ctx.try_adding_transaction_request(transaction_id) { requests.push(req); } + + if should_throttle && requests.len() >= limit as usize { + break; + } } // Request the transactions if !requests.is_empty() { - // TODO: determine if there should be a limit to the number of ids per message // trace!("Send a request to {} with {} transaction ids", self.router.identity(), requests.len()); self.router .enqueue(make_message!( @@ -160,6 +202,7 @@ impl RelayTransactionsFlow { &mut self, consensus: ConsensusProxy, requests: Vec>, + should_throttle: bool, ) -> Result<(), ProtocolError> { let mut transactions: Vec = Vec::with_capacity(requests.len()); for request in requests { @@ -200,10 +243,13 @@ impl RelayTransactionsFlow { } self.ctx - .broadcast_transactions(insert_results.into_iter().filter_map(|res| match res { - Ok(x) => Some(x.id()), - Err(_) => None, - })) + .broadcast_transactions( + insert_results.into_iter().filter_map(|res| match res { + Ok(x) => Some(x.id()), + Err(_) => None, + }), + should_throttle, + ) .await; Ok(()) diff --git a/protocol/p2p/Cargo.toml b/protocol/p2p/Cargo.toml index 7ea7893ae..be7a806d7 100644 --- a/protocol/p2p/Cargo.toml +++ b/protocol/p2p/Cargo.toml @@ -35,6 +35,7 @@ itertools.workspace = true log.workspace = true parking_lot.workspace = true prost.workspace = true +rand.workspace = true seqlock.workspace = true serde.workspace = true thiserror.workspace = true diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index 2602435e6..2183b04f0 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -8,6 +8,7 @@ use std::{ use tokio::sync::mpsc::Receiver as MpscReceiver; use super::peer::PeerKey; +use rand::seq::SliceRandom; #[derive(Debug)] pub(crate) enum HubEvent { @@ -103,6 +104,21 @@ impl Hub { } } + /// Broadcast a message to some peers given a percentage + pub async fn broadcast_some(&self, msg: KaspadMessage, percentage: f64) { + let percentage = percentage.clamp(0.0, 1.0); + + let peers = self.peers.read().values().cloned().collect::>(); + + let peers_to_select = ((percentage * peers.len() as f64) as usize).min(1); + + let peers = peers.choose_multiple(&mut rand::thread_rng(), peers_to_select).cloned().collect::>(); + + for router in peers { + let _ = router.enqueue(msg.clone()).await; + } + } + /// Broadcast a vector of messages to all peers pub async fn broadcast_many(&self, msgs: Vec) { if msgs.is_empty() {