diff --git a/Cargo.lock b/Cargo.lock index 65cbcc92f..7c795a9e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2741,6 +2741,7 @@ dependencies = [ "log", "parking_lot", "prost", + "rand 0.8.5", "seqlock", "serde", "thiserror", diff --git a/mining/src/manager.rs b/mining/src/manager.rs index ccd669718..bc6104e1a 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -17,7 +17,7 @@ use crate::{ topological_sort::IntoIterTopologically, tx_query::TransactionQuery, }, - MiningCounters, + MempoolCountersSnapshot, MiningCounters, }; use itertools::Itertools; use kaspa_consensus_core::{ @@ -873,4 +873,8 @@ impl MiningManagerProxy { pub async fn unknown_transactions(self, transactions: Vec) -> Vec { spawn_blocking(move || self.inner.unknown_transactions(transactions)).await.unwrap() } + + pub fn snapshot(&self) -> MempoolCountersSnapshot { + self.inner.counters.snapshot() + } } diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 23daad29f..4be955996 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -560,7 +560,7 @@ impl FlowContext { return; } - self.broadcast_transactions(transactions_to_broadcast).await; + self.broadcast_transactions(transactions_to_broadcast, false).await; if self.should_run_mempool_scanning_task().await { // Spawn a task executing the removal of expired low priority transactions and, if time has come too, @@ -580,7 +580,7 @@ impl FlowContext { mining_manager.revalidate_high_priority_transactions(&consensus_clone, tx).await; }); while let Some(transactions) = rx.recv().await { - let _ = context.broadcast_transactions(transactions).await; + let _ = context.broadcast_transactions(transactions, false).await; } } context.mempool_scanning_is_done().await; @@ -614,7 +614,7 @@ impl FlowContext { ) -> Result<(), ProtocolError> { let accepted_transactions = self.mining_manager().clone().validate_and_insert_transaction(consensus, transaction, Priority::High, orphan).await?; - self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id())).await; + self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id()), false).await; Ok(()) } @@ -641,8 +641,8 @@ impl FlowContext { /// /// The broadcast itself may happen only during a subsequent call to this function since it is done at most /// after a predefined interval or when the queue length is larger than the Inv message capacity. - pub async fn broadcast_transactions>(&self, transaction_ids: I) { - self.transactions_spread.write().await.broadcast_transactions(transaction_ids).await + pub async fn broadcast_transactions>(&self, transaction_ids: I, should_throttle: bool) { + self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await } } diff --git a/protocol/flows/src/flowcontext/transactions.rs b/protocol/flows/src/flowcontext/transactions.rs index a19c0b95f..0b4eadfb7 100644 --- a/protocol/flows/src/flowcontext/transactions.rs +++ b/protocol/flows/src/flowcontext/transactions.rs @@ -77,7 +77,7 @@ impl TransactionsSpread { /// capacity. /// /// _GO-KASPAD: EnqueueTransactionIDsForPropagation_ - pub async fn broadcast_transactions>(&mut self, transaction_ids: I) { + pub async fn broadcast_transactions>(&mut self, transaction_ids: I, should_throttle: bool) { self.transaction_ids.enqueue_chunk(transaction_ids); let now = Instant::now(); @@ -89,13 +89,17 @@ impl TransactionsSpread { let ids = self.transaction_ids.dequeue_chunk(MAX_INV_PER_TX_INV_MSG).map(|x| x.into()).collect_vec(); debug!("Transaction propagation: broadcasting {} transactions", ids.len()); let msg = make_message!(Payload::InvTransactions, InvTransactionsMessage { ids }); - self.broadcast(msg).await; + self.broadcast(msg, should_throttle).await; } self.last_broadcast_time = Instant::now(); } - async fn broadcast(&self, msg: KaspadMessage) { - self.hub.broadcast(msg).await + async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) { + if should_throttle { + self.hub.broadcast_some(msg, 0.5).await + } else { + self.hub.broadcast(msg).await + } } } diff --git a/protocol/flows/src/v5/txrelay/flow.rs b/protocol/flows/src/v5/txrelay/flow.rs index 44c4860f2..ea10f3dcc 100644 --- a/protocol/flows/src/v5/txrelay/flow.rs +++ b/protocol/flows/src/v5/txrelay/flow.rs @@ -5,6 +5,7 @@ use crate::{ }; use kaspa_consensus_core::tx::{Transaction, TransactionId}; use kaspa_consensusmanager::ConsensusProxy; +use kaspa_core::{time::unix_now, warn}; use kaspa_mining::{ errors::MiningManagerError, mempool::{ @@ -12,6 +13,7 @@ use kaspa_mining::{ tx::{Orphan, Priority}, }, model::tx_query::TransactionQuery, + MempoolCountersSnapshot, }; use kaspa_p2p_lib::{ common::{ProtocolError, DEFAULT_TIMEOUT}, @@ -22,6 +24,8 @@ use kaspa_p2p_lib::{ use std::sync::Arc; use tokio::time::timeout; +pub(crate) const MAX_TPS_THRESHOLD: u64 = 3000; + enum Response { Transaction(Transaction), NotFound(TransactionId), @@ -69,7 +73,7 @@ impl RelayTransactionsFlow { pub fn invs_channel_size() -> usize { // TODO: reevaluate when the node is fully functional and later when the network tx rate increases // Note: in go-kaspad we have 10,000 for this channel combined with tx channel. - 8192 + 4096 } pub fn txs_channel_size() -> usize { @@ -80,7 +84,30 @@ impl RelayTransactionsFlow { async fn start_impl(&mut self) -> Result<(), ProtocolError> { // trace!("Starting relay transactions flow with {}", self.router.identity()); + let mut last_checked_time = unix_now(); + let mut curr_snapshot = self.ctx.mining_manager().clone().snapshot(); + let mut should_throttle = false; + loop { + let now = unix_now(); + if now - last_checked_time > 10000 { + let next_snapshot = self.ctx.mining_manager().clone().snapshot(); + let snapshot_delta = &next_snapshot - &curr_snapshot; + + last_checked_time = now; + curr_snapshot = next_snapshot; + + if snapshot_delta.low_priority_tx_counts > 0 { + let tps = snapshot_delta.low_priority_tx_counts / 10; + if tps > MAX_TPS_THRESHOLD { + warn!("P2P tx relay threshold exceeded. Throttling relay. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD); + should_throttle = true; + } else if tps < MAX_TPS_THRESHOLD / 2 && should_throttle { + warn!("P2P tx relay threshold back to normal. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD); + should_throttle = false; + } + } + } // Loop over incoming block inv messages let inv: Vec = dequeue!(self.invs_route, Payload::InvTransactions)?.try_into()?; // trace!("Receive an inv message from {} with {} transaction ids", self.router.identity(), inv.len()); @@ -96,28 +123,43 @@ impl RelayTransactionsFlow { continue; } - let requests = self.request_transactions(inv).await?; - self.receive_transactions(session, requests).await?; + let requests = self.request_transactions(inv, should_throttle, &curr_snapshot).await?; + self.receive_transactions(session, requests, should_throttle).await?; } } async fn request_transactions( &self, transaction_ids: Vec, + should_throttle: bool, + curr_snapshot: &MempoolCountersSnapshot, ) -> Result>, ProtocolError> { // Build a vector with the transaction ids unknown in the mempool and not already requested // by another peer let transaction_ids = self.ctx.mining_manager().clone().unknown_transactions(transaction_ids).await; let mut requests = Vec::new(); + let snapshot_delta = curr_snapshot - &self.ctx.mining_manager().clone().snapshot(); + + // To reduce the P2P TPS to below the threshold, we need to request up to a max of + // whatever the balances overage. If MAX_TPS_THRESHOLD is 3000 and the current TPS is 4000, + // then we can only request up to 2000 (MAX - (4000 - 3000)) to average out into the threshold. + let curr_p2p_tps = snapshot_delta.low_priority_tx_counts / (snapshot_delta.elapsed_time.as_millis().max(1) as u64); + let overage = if should_throttle && curr_p2p_tps > MAX_TPS_THRESHOLD { curr_p2p_tps - MAX_TPS_THRESHOLD } else { 0 }; + + let limit = MAX_TPS_THRESHOLD - overage; + for transaction_id in transaction_ids { if let Some(req) = self.ctx.try_adding_transaction_request(transaction_id) { requests.push(req); } + + if should_throttle && requests.len() >= limit as usize { + break; + } } // Request the transactions if !requests.is_empty() { - // TODO: determine if there should be a limit to the number of ids per message // trace!("Send a request to {} with {} transaction ids", self.router.identity(), requests.len()); self.router .enqueue(make_message!( @@ -160,6 +202,7 @@ impl RelayTransactionsFlow { &mut self, consensus: ConsensusProxy, requests: Vec>, + should_throttle: bool, ) -> Result<(), ProtocolError> { let mut transactions: Vec = Vec::with_capacity(requests.len()); for request in requests { @@ -200,10 +243,13 @@ impl RelayTransactionsFlow { } self.ctx - .broadcast_transactions(insert_results.into_iter().filter_map(|res| match res { - Ok(x) => Some(x.id()), - Err(_) => None, - })) + .broadcast_transactions( + insert_results.into_iter().filter_map(|res| match res { + Ok(x) => Some(x.id()), + Err(_) => None, + }), + should_throttle, + ) .await; Ok(()) diff --git a/protocol/p2p/Cargo.toml b/protocol/p2p/Cargo.toml index 7ea7893ae..be7a806d7 100644 --- a/protocol/p2p/Cargo.toml +++ b/protocol/p2p/Cargo.toml @@ -35,6 +35,7 @@ itertools.workspace = true log.workspace = true parking_lot.workspace = true prost.workspace = true +rand.workspace = true seqlock.workspace = true serde.workspace = true thiserror.workspace = true diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index 2602435e6..2183b04f0 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -8,6 +8,7 @@ use std::{ use tokio::sync::mpsc::Receiver as MpscReceiver; use super::peer::PeerKey; +use rand::seq::SliceRandom; #[derive(Debug)] pub(crate) enum HubEvent { @@ -103,6 +104,21 @@ impl Hub { } } + /// Broadcast a message to some peers given a percentage + pub async fn broadcast_some(&self, msg: KaspadMessage, percentage: f64) { + let percentage = percentage.clamp(0.0, 1.0); + + let peers = self.peers.read().values().cloned().collect::>(); + + let peers_to_select = ((percentage * peers.len() as f64) as usize).min(1); + + let peers = peers.choose_multiple(&mut rand::thread_rng(), peers_to_select).cloned().collect::>(); + + for router in peers { + let _ = router.enqueue(msg.clone()).await; + } + } + /// Broadcast a vector of messages to all peers pub async fn broadcast_many(&self, msgs: Vec) { if msgs.is_empty() {