Skip to content

Commit

Permalink
Implement basic tx throttling on high P2P load
Browse files Browse the repository at this point in the history
When P2P load is high:
1. Limit the number of peers to broadcast to
2. Limit the number of transactions requested when requesting missing
   transactions
3. Reduce the size of the invs channel and allow dropping invs when
   it's full
  • Loading branch information
coderofstuff committed Jan 6, 2024
1 parent f604a69 commit 8b168cd
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
topological_sort::IntoIterTopologically,
tx_query::TransactionQuery,
},
MiningCounters,
MempoolCountersSnapshot, MiningCounters,
};
use itertools::Itertools;
use kaspa_consensus_core::{
Expand Down Expand Up @@ -873,4 +873,8 @@ impl MiningManagerProxy {
pub async fn unknown_transactions(self, transactions: Vec<TransactionId>) -> Vec<TransactionId> {
spawn_blocking(move || self.inner.unknown_transactions(transactions)).await.unwrap()
}

pub fn snapshot(&self) -> MempoolCountersSnapshot {
self.inner.counters.snapshot()
}
}
10 changes: 5 additions & 5 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl FlowContext {
return;
}

self.broadcast_transactions(transactions_to_broadcast).await;
self.broadcast_transactions(transactions_to_broadcast, false).await;

if self.should_run_mempool_scanning_task().await {
// Spawn a task executing the removal of expired low priority transactions and, if time has come too,
Expand All @@ -580,7 +580,7 @@ impl FlowContext {
mining_manager.revalidate_high_priority_transactions(&consensus_clone, tx).await;
});
while let Some(transactions) = rx.recv().await {
let _ = context.broadcast_transactions(transactions).await;
let _ = context.broadcast_transactions(transactions, false).await;
}
}
context.mempool_scanning_is_done().await;
Expand Down Expand Up @@ -614,7 +614,7 @@ impl FlowContext {
) -> Result<(), ProtocolError> {
let accepted_transactions =
self.mining_manager().clone().validate_and_insert_transaction(consensus, transaction, Priority::High, orphan).await?;
self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id())).await;
self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id()), false).await;
Ok(())
}

Expand All @@ -641,8 +641,8 @@ impl FlowContext {
///
/// The broadcast itself may happen only during a subsequent call to this function since it is done at most
/// after a predefined interval or when the queue length is larger than the Inv message capacity.
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&self, transaction_ids: I) {
self.transactions_spread.write().await.broadcast_transactions(transaction_ids).await
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&self, transaction_ids: I, should_throttle: bool) {
self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await
}
}

Expand Down
12 changes: 8 additions & 4 deletions protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TransactionsSpread {
/// capacity.
///
/// _GO-KASPAD: EnqueueTransactionIDsForPropagation_
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&mut self, transaction_ids: I) {
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&mut self, transaction_ids: I, should_throttle: bool) {
self.transaction_ids.enqueue_chunk(transaction_ids);

let now = Instant::now();
Expand All @@ -89,13 +89,17 @@ impl TransactionsSpread {
let ids = self.transaction_ids.dequeue_chunk(MAX_INV_PER_TX_INV_MSG).map(|x| x.into()).collect_vec();
debug!("Transaction propagation: broadcasting {} transactions", ids.len());
let msg = make_message!(Payload::InvTransactions, InvTransactionsMessage { ids });
self.broadcast(msg).await;
self.broadcast(msg, should_throttle).await;
}

self.last_broadcast_time = Instant::now();
}

async fn broadcast(&self, msg: KaspadMessage) {
self.hub.broadcast(msg).await
async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) {
if should_throttle {
self.hub.broadcast_some(msg, 0.5).await
} else {
self.hub.broadcast(msg).await
}
}
}
62 changes: 54 additions & 8 deletions protocol/flows/src/v5/txrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use crate::{
};
use kaspa_consensus_core::tx::{Transaction, TransactionId};
use kaspa_consensusmanager::ConsensusProxy;
use kaspa_core::{time::unix_now, warn};
use kaspa_mining::{
errors::MiningManagerError,
mempool::{
errors::RuleError,
tx::{Orphan, Priority},
},
model::tx_query::TransactionQuery,
MempoolCountersSnapshot,
};
use kaspa_p2p_lib::{
common::{ProtocolError, DEFAULT_TIMEOUT},
Expand All @@ -22,6 +24,8 @@ use kaspa_p2p_lib::{
use std::sync::Arc;
use tokio::time::timeout;

pub(crate) const MAX_TPS_THRESHOLD: u64 = 3000;

enum Response {
Transaction(Transaction),
NotFound(TransactionId),
Expand Down Expand Up @@ -69,7 +73,7 @@ impl RelayTransactionsFlow {
pub fn invs_channel_size() -> usize {
// TODO: reevaluate when the node is fully functional and later when the network tx rate increases
// Note: in go-kaspad we have 10,000 for this channel combined with tx channel.
8192
4096
}

pub fn txs_channel_size() -> usize {
Expand All @@ -80,7 +84,30 @@ impl RelayTransactionsFlow {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
// trace!("Starting relay transactions flow with {}", self.router.identity());
let mut last_checked_time = unix_now();
let mut curr_snapshot = self.ctx.mining_manager().clone().snapshot();
let mut should_throttle = false;

loop {
let now = unix_now();
if now - last_checked_time > 10000 {
let next_snapshot = self.ctx.mining_manager().clone().snapshot();
let snapshot_delta = &next_snapshot - &curr_snapshot;

last_checked_time = now;
curr_snapshot = next_snapshot;

if snapshot_delta.low_priority_tx_counts > 0 {
let tps = snapshot_delta.low_priority_tx_counts / 10;
if tps > MAX_TPS_THRESHOLD {
warn!("P2P tx relay threshold exceeded. Throttling relay. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD);
should_throttle = true;
} else if tps < MAX_TPS_THRESHOLD / 2 && should_throttle {
warn!("P2P tx relay threshold back to normal. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD);
should_throttle = false;
}
}
}
// Loop over incoming block inv messages
let inv: Vec<TransactionId> = dequeue!(self.invs_route, Payload::InvTransactions)?.try_into()?;
// trace!("Receive an inv message from {} with {} transaction ids", self.router.identity(), inv.len());
Expand All @@ -96,28 +123,43 @@ impl RelayTransactionsFlow {
continue;
}

let requests = self.request_transactions(inv).await?;
self.receive_transactions(session, requests).await?;
let requests = self.request_transactions(inv, should_throttle, &curr_snapshot).await?;
self.receive_transactions(session, requests, should_throttle).await?;
}
}

async fn request_transactions(
&self,
transaction_ids: Vec<TransactionId>,
should_throttle: bool,
curr_snapshot: &MempoolCountersSnapshot,
) -> Result<Vec<RequestScope<TransactionId>>, ProtocolError> {
// Build a vector with the transaction ids unknown in the mempool and not already requested
// by another peer
let transaction_ids = self.ctx.mining_manager().clone().unknown_transactions(transaction_ids).await;
let mut requests = Vec::new();
let snapshot_delta = curr_snapshot - &self.ctx.mining_manager().clone().snapshot();

// To reduce the P2P TPS to below the threshold, we need to request up to a max of
// whatever the balances overage. If MAX_TPS_THRESHOLD is 3000 and the current TPS is 4000,
// then we can only request up to 2000 (MAX - (4000 - 3000)) to average out into the threshold.
let curr_p2p_tps = snapshot_delta.low_priority_tx_counts / (snapshot_delta.elapsed_time.as_millis().max(1) as u64);
let overage = if should_throttle && curr_p2p_tps > MAX_TPS_THRESHOLD { curr_p2p_tps - MAX_TPS_THRESHOLD } else { 0 };

let limit = MAX_TPS_THRESHOLD - overage;

for transaction_id in transaction_ids {
if let Some(req) = self.ctx.try_adding_transaction_request(transaction_id) {
requests.push(req);
}

if should_throttle && requests.len() >= limit as usize {
break;
}
}

// Request the transactions
if !requests.is_empty() {
// TODO: determine if there should be a limit to the number of ids per message
// trace!("Send a request to {} with {} transaction ids", self.router.identity(), requests.len());
self.router
.enqueue(make_message!(
Expand Down Expand Up @@ -160,6 +202,7 @@ impl RelayTransactionsFlow {
&mut self,
consensus: ConsensusProxy,
requests: Vec<RequestScope<TransactionId>>,
should_throttle: bool,
) -> Result<(), ProtocolError> {
let mut transactions: Vec<Transaction> = Vec::with_capacity(requests.len());
for request in requests {
Expand Down Expand Up @@ -200,10 +243,13 @@ impl RelayTransactionsFlow {
}

self.ctx
.broadcast_transactions(insert_results.into_iter().filter_map(|res| match res {
Ok(x) => Some(x.id()),
Err(_) => None,
}))
.broadcast_transactions(
insert_results.into_iter().filter_map(|res| match res {
Ok(x) => Some(x.id()),
Err(_) => None,
}),
should_throttle,
)
.await;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions protocol/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ itertools.workspace = true
log.workspace = true
parking_lot.workspace = true
prost.workspace = true
rand.workspace = true
seqlock.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand Down
16 changes: 16 additions & 0 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use tokio::sync::mpsc::Receiver as MpscReceiver;

use super::peer::PeerKey;
use rand::seq::SliceRandom;

#[derive(Debug)]
pub(crate) enum HubEvent {
Expand Down Expand Up @@ -103,6 +104,21 @@ impl Hub {
}
}

/// Broadcast a message to some peers given a percentage
pub async fn broadcast_some(&self, msg: KaspadMessage, percentage: f64) {
let percentage = percentage.clamp(0.0, 1.0);

let peers = self.peers.read().values().cloned().collect::<Vec<_>>();

let peers_to_select = ((percentage * peers.len() as f64) as usize).min(1);

let peers = peers.choose_multiple(&mut rand::thread_rng(), peers_to_select).cloned().collect::<Vec<_>>();

for router in peers {
let _ = router.enqueue(msg.clone()).await;
}
}

/// Broadcast a vector of messages to all peers
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>) {
if msgs.is_empty() {
Expand Down

0 comments on commit 8b168cd

Please sign in to comment.