Skip to content

Commit

Permalink
Some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
someone235 committed Jan 7, 2024
1 parent 16a72ee commit 4ab7a50
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
14 changes: 12 additions & 2 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ impl FlowContext {
return;
}

// TODO: Throttle these transactions as well if needed
self.broadcast_transactions(transactions_to_broadcast, false).await;

if self.should_run_mempool_scanning_task().await {
Expand All @@ -580,7 +581,12 @@ impl FlowContext {
mining_manager.revalidate_high_priority_transactions(&consensus_clone, tx).await;
});
while let Some(transactions) = rx.recv().await {
let _ = context.broadcast_transactions(transactions, false).await;
let _ = context
.broadcast_transactions(
transactions,
true, // We throttle high priority even when the network is not flooded since they will be rebroadcast if not accepted within reasonable time.
)
.await;
}
}
context.mempool_scanning_is_done().await;
Expand Down Expand Up @@ -614,7 +620,11 @@ impl FlowContext {
) -> Result<(), ProtocolError> {
let accepted_transactions =
self.mining_manager().clone().validate_and_insert_transaction(consensus, transaction, Priority::High, orphan).await?;
self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id()), false).await;
self.broadcast_transactions(
accepted_transactions.iter().map(|x| x.id()),
false, // RPC transactions are considered high priority, so we don't want to throttle them
)
.await;
Ok(())
}

Expand Down
5 changes: 2 additions & 3 deletions protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ impl TransactionsSpread {

async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) {
if should_throttle {
// Broadcast to only half of the peers
// TODO: Figure out the better percentage
self.hub.broadcast_to_some_peers(msg, 0.5).await
// TODO: Figure out a better number
self.hub.broadcast_to_some_peers(msg, 8).await
} else {
self.hub.broadcast(msg).await
}
Expand Down
3 changes: 2 additions & 1 deletion protocol/flows/src/v5/txrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl RelayTransactionsFlow {
let mut should_throttle = false;

loop {
// TODO: Extract should_throttle logic to a separate function
let now = unix_now();
if now - last_checked_time > 10000 {
let next_snapshot = self.ctx.mining_manager().clone().snapshot();
Expand All @@ -98,7 +99,7 @@ impl RelayTransactionsFlow {
curr_snapshot = next_snapshot;

if snapshot_delta.low_priority_tx_counts > 0 {
let tps = snapshot_delta.low_priority_tx_counts / 10;
let tps = snapshot_delta.low_priority_tx_counts / self.ctx.config.params.bps();
if !should_throttle && tps > MAX_TPS_THRESHOLD {
warn!("P2P tx relay threshold exceeded. Throttling relay. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD);
should_throttle = true;
Expand Down
13 changes: 6 additions & 7 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{common::ProtocolError, pb::KaspadMessage, ConnectionInitializer, Peer, Router};
use core::num;
use kaspa_core::{debug, info, warn};
use parking_lot::RwLock;
use std::{
Expand Down Expand Up @@ -105,14 +106,12 @@ impl Hub {
}

/// Broadcast a message to some peers given a percentage
pub async fn broadcast_to_some_peers(&self, msg: KaspadMessage, percentage: f64) {
let percentage = percentage.clamp(0.0, 1.0);

pub async fn broadcast_to_some_peers(&self, msg: KaspadMessage, num_peers: usize) {
assert!(num_peers > 0);
let peers = self.peers.read().values().cloned().collect::<Vec<_>>();

let peers_to_select = ((percentage * peers.len() as f64) as usize).min(1);

let peers = peers.choose_multiple(&mut rand::thread_rng(), peers_to_select).cloned().collect::<Vec<_>>();
// TODO: At least some of the peers should be outbound, because an attacker can gain less control
// over the set of outbound peers.
let peers = peers.choose_multiple(&mut rand::thread_rng(), num_peers).cloned().collect::<Vec<_>>();

for router in peers {
let _ = router.enqueue(msg.clone()).await;
Expand Down

0 comments on commit 4ab7a50

Please sign in to comment.