Skip to content

feat: punish malicious peers #16818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/net/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ pub struct TransactionsManagerMetrics {
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct TransactionFetcherMetrics {
/// Total number of reported bad transactions
pub(crate) reported_bad_transactions: Counter,
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
pub(crate) inflight_transaction_requests: Gauge,
Expand Down
40 changes: 35 additions & 5 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
metrics::TransactionFetcherMetrics,
NetworkHandle,
};
use alloy_consensus::transaction::PooledTransaction;
use alloy_primitives::TxHash;
Expand All @@ -45,7 +46,7 @@ use reth_eth_wire::{
PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
};
use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::PeerRequest;
use reth_network_api::{PeerRequest, Peers, ReputationChangeKind};
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_primitives_traits::SignedTransaction;
Expand All @@ -66,6 +67,8 @@ use tracing::trace;
#[derive(Debug)]
#[pin_project]
pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Network access.
network: Option<NetworkHandle<N>>,
/// All peers with to which a [`GetPooledTransactions`] request is inflight.
pub active_peers: LruMap<PeerId, u8, ByLength>,
/// All currently active [`GetPooledTransactions`] requests.
Expand Down Expand Up @@ -94,6 +97,20 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
self.active_peers.remove(peer_id);
}

/// Reports peer for sending bad transactions.
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
self.metrics.reported_bad_transactions.increment(1);
}

/// Updates the reputation of peer.
fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
if let Some(network_handle) = &self.network {
network_handle.reputation_change(peer_id, kind);
}
}

/// Updates metrics.
#[inline]
pub fn update_metrics(&self) {
Expand All @@ -120,7 +137,10 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
}

/// Sets up transaction fetcher with config
pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
pub fn with_transaction_fetcher_config(
config: &TransactionFetcherConfig,
network: NetworkHandle<N>,
) -> Self {
let TransactionFetcherConfig {
max_inflight_requests,
max_capacity_cache_txns_pending_fetch,
Expand All @@ -140,6 +160,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
),
info,
metrics,
network: Some(network),
..Default::default()
}
}
Expand Down Expand Up @@ -216,7 +237,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {

if idle_peer.is_some() {
hashes_to_request.insert(hash);
break idle_peer.copied()
break idle_peer.copied();
}

if let Some(ref mut bud) = budget {
Expand Down Expand Up @@ -275,7 +296,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {

// tx is really big, pack request with single tx
if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
return hashes_from_announcement_iter.collect()
return hashes_from_announcement_iter.collect();
}
acc_size_response = size;
}
Expand Down Expand Up @@ -655,7 +676,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
self.metrics.egress_peer_channel_full.increment(1);
Some(new_announced_hashes)
}
}
};
}

*inflight_count += 1;
Expand Down Expand Up @@ -895,6 +916,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
if unsolicited > 0 {
self.metrics.unsolicited_transactions.increment(unsolicited as u64);
}
let mut has_bad_transactions = false;
if verification_outcome == VerificationOutcome::ReportPeer {
// todo: report peer for sending hashes that weren't requested
trace!(target: "net::tx",
Expand All @@ -903,6 +925,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
verified_payload_len=verified_payload.len(),
"received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
);
has_bad_transactions = true;
}
// peer has only sent hashes that we didn't request
if verified_payload.is_empty() {
Expand All @@ -928,6 +951,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
valid_payload_len=valid_payload.len(),
"received `PooledTransactions` response from peer with duplicate entries, filtered them out"
);
has_bad_transactions = true;
}
// valid payload will have at least one transaction at this point. even if the tx
// size/type announced by the peer is different to the actual tx size/type, pass on
Expand Down Expand Up @@ -956,6 +980,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
fetched_len=fetched.len(),
"peer failed to serve hashes it announced"
);
has_bad_transactions = true;
}

//
Expand All @@ -965,6 +990,10 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {

let transactions = valid_payload.into_data().into_values().collect();

if has_bad_transactions {
self.report_peer_bad_transactions(peer_id);
}

FetchEvent::TransactionsFetched { peer_id, transactions }
}
Ok(Err(req_err)) => {
Expand Down Expand Up @@ -1010,6 +1039,7 @@ impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
),
info: TransactionFetcherInfo::default(),
metrics: Default::default(),
network: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>

let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
&transactions_manager_config.transaction_fetcher_config,
network.clone(),
);

// install a listener for new __pending__ transactions that are allowed to be propagated
Expand Down
Loading