Skip to content

Commit

Permalink
feat(l1): fetch random peer + filter by supported capability (#1536)
Browse files Browse the repository at this point in the history
**Motivation**
When choosing a peer for backend uses (such as syncing) we need to fetch
a random peer and guarantee that its has an active connection and that
it supports the capability needed

**Description**
* Store supported capabilities in `PeerData`
* Add a way to get a random filtered peer
* Update `get_peer` function so that it fetches a random peer and also
receives a capability that the selected peer must support

**Follow-Up Work & Ideas**
We could implement a `PeerManager` that holds a reference to the
kademlia table and is in charge of selecting a suitable peer and
performing the requests (instead of selecting a peer and calling a
request_ method from the backend). This will allow us to further
abstract p2p logic from other backend processes & also implement
penalizations for peers returning invalid responses

<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes #1317 #1318
  • Loading branch information
fmoletta authored Jan 8, 2025
1 parent 454f809 commit 2c98828
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 24 deletions.
60 changes: 48 additions & 12 deletions crates/networking/p2p/kademlia.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
discv4::{time_now_unix, FindNodeRequest},
peer_channels::PeerChannels,
rlpx::p2p::Capability,
types::Node,
};
use ethrex_core::{H256, H512, U256};
Expand Down Expand Up @@ -215,6 +216,30 @@ impl KademliaTable {
peers
}

/// Returns an iterator for all peers in the table
fn iter_peers(&self) -> impl Iterator<Item = &PeerData> {
self.buckets.iter().flat_map(|bucket| bucket.peers.iter())
}

/// Returns an iterator for all peers in the table that match the filter
fn filter_peers<'a>(
&'a self,
filter: &'a dyn Fn(&'a PeerData) -> bool,
) -> impl Iterator<Item = &PeerData> + 'a {
self.iter_peers().filter(|peer| filter(peer))
}

/// Obtain a random peer from the kademlia table that matches the filter
fn get_random_peer_with_filter<'a>(
&'a self,
filter: &'a dyn Fn(&'a PeerData) -> bool,
) -> Option<&'a PeerData> {
let peer_idx = rand::random::<usize>()
.checked_rem(self.filter_peers(filter).count())
.unwrap_or_default();
self.filter_peers(filter).nth(peer_idx)
}

/// Replaces the peer with the given id with the latest replacement stored.
/// If there are no replacements, it simply remove it
///
Expand Down Expand Up @@ -257,32 +282,41 @@ impl KademliaTable {
None
}

/// Sets the necessary data for the peer to be usable from the node's backend
/// Set the sender end of the channel between the kademlia table and the peer's active connection
/// Set the peer's supported capabilities
/// This function should be called each time a connection is established so the backend can send requests to the peers
pub fn set_channels(&mut self, node_id: H512, channels: PeerChannels) {
pub(crate) fn init_backend_communication(
&mut self,
node_id: H512,
channels: PeerChannels,
capabilities: Vec<Capability>,
) {
let bucket_idx = bucket_number(self.local_node_id, node_id);
if let Some(peer) = self.buckets.get_mut(bucket_idx).and_then(|bucket| {
bucket
.peers
.iter_mut()
.find(|peer| peer.node.node_id == node_id)
}) {
peer.channels = Some(channels)
peer.channels = Some(channels);
peer.supported_capabilities = capabilities;
}
}

/// TODO: Randomly select peer
pub fn get_peer(&self) -> Option<PeerData> {
self.get_least_recently_pinged_peers(1).pop()
}

/// Returns the channel ends to an active peer connection
/// The peer is selected randomly (TODO), and doesn't guarantee that the selected peer is not currenlty busy
/// Returns the channel ends to an active peer connection that supports the given capability
/// The peer is selected randomly, and doesn't guarantee that the selected peer is not currenlty busy
/// If no peer is found, this method will try again after 10 seconds
/// TODO: Filter peers by capabilities, set max amount of retries
pub async fn get_peer_channels(&self) -> PeerChannels {
pub async fn get_peer_channels(&self, capability: Capability) -> PeerChannels {
let filter = |peer: &PeerData| -> bool {
// Search for peers with an active connection that support the required capabilities
peer.channels.is_some() && peer.supported_capabilities.contains(&capability)
};
loop {
if let Some(channels) = self.get_peer().and_then(|peer| peer.channels) {
if let Some(channels) = self
.get_random_peer_with_filter(&filter)
.and_then(|peer| peer.channels.clone())
{
return channels;
}
info!("[Sync] No peers available, retrying in 10 sec");
Expand Down Expand Up @@ -311,6 +345,7 @@ pub struct PeerData {
pub last_ping_hash: Option<H256>,
pub is_proven: bool,
pub find_node_request: Option<FindNodeRequest>,
pub supported_capabilities: Vec<Capability>,
/// a ration to track the peers's ping responses
pub liveness: u16,
/// if a revalidation was sent to the peer, the bool marks if it has answered
Expand All @@ -331,6 +366,7 @@ impl PeerData {
find_node_request: None,
revalidation: None,
channels: None,
supported_capabilities: vec![],
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
)
.await;
};
table.lock().await.set_channels(node_id, peer_channels);
let capabilities = self.capabilities.iter().map(|(cap, _)| *cap).collect();
table
.lock()
.await
.init_backend_communication(node_id, peer_channels, capabilities);
if let Err(e) = self.handle_peer_conn(sender, receiver).await {
self.peer_conn_failed("Error during RLPx connection", e, table)
.await;
Expand Down
4 changes: 2 additions & 2 deletions crates/networking/p2p/rlpx/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use super::{
utils::{pubkey2id, snappy_compress},
};

#[derive(Debug, Clone, PartialEq)]
pub(crate) enum Capability {
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum Capability {
P2p,
Eth,
Snap,
Expand Down
29 changes: 20 additions & 9 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
};
use tracing::{debug, info, warn};

use crate::rlpx::p2p::Capability;
use crate::{kademlia::KademliaTable, peer_channels::BlockRequestOrder};

/// Maximum amount of times we will ask a peer for an account/storage range
Expand Down Expand Up @@ -100,7 +101,12 @@ impl SyncManager {
// This step is not parallelized
let mut all_block_hashes = vec![];
loop {
let peer = self.peers.lock().await.get_peer_channels().await;
let peer = self
.peers
.lock()
.await
.get_peer_channels(Capability::Eth)
.await;
debug!("Requesting Block Headers from {current_head}");
// Request Block Headers from Peer
if let Some(mut block_headers) = peer
Expand Down Expand Up @@ -212,7 +218,7 @@ async fn download_and_run_blocks(
store: Store,
) -> Result<(), SyncError> {
loop {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Eth).await;
debug!("Requesting Block Bodies ");
if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await {
let block_bodies_len = block_bodies.len();
Expand Down Expand Up @@ -251,7 +257,7 @@ async fn store_block_bodies(
store: Store,
) -> Result<(), SyncError> {
loop {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Eth).await;
debug!("Requesting Block Headers ");
if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await {
debug!(" Received {} Block Bodies", block_bodies.len());
Expand All @@ -278,7 +284,7 @@ async fn store_receipts(
store: Store,
) -> Result<(), SyncError> {
loop {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Eth).await;
debug!("Requesting Block Headers ");
if let Some(receipts) = peer.request_receipts(block_hashes.clone()).await {
debug!(" Received {} Receipts", receipts.len());
Expand Down Expand Up @@ -323,7 +329,12 @@ async fn rebuild_state_trie(
// Fetch Account Ranges
// If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available
for _ in 0..MAX_RETRIES {
let peer = peers.clone().lock().await.get_peer_channels().await;
let peer = peers
.clone()
.lock()
.await
.get_peer_channels(Capability::Snap)
.await;
debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}");
if let Some((account_hashes, accounts, should_continue)) = peer
.request_account_range(state_root, start_account_hash)
Expand Down Expand Up @@ -431,7 +442,7 @@ async fn fetch_bytecode_batch(
store: Store,
) -> Result<Vec<H256>, StoreError> {
loop {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Snap).await;
if let Some(bytecodes) = peer.request_bytecodes(batch.clone()).await {
debug!("Received {} bytecodes", bytecodes.len());
// Store the bytecodes
Expand Down Expand Up @@ -489,7 +500,7 @@ async fn fetch_storage_batch(
store: Store,
) -> Result<Vec<(H256, H256)>, StoreError> {
for _ in 0..MAX_RETRIES {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Snap).await;
let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip();
if let Some((mut keys, mut values, incomplete)) = peer
.request_storage_ranges(state_root, batch_roots, batch_hahses, H256::zero())
Expand Down Expand Up @@ -544,7 +555,7 @@ async fn heal_state_trie(
// Count the number of request retries so we don't get stuck requesting old state
let mut retry_count = 0;
while !paths.is_empty() && retry_count < MAX_RETRIES {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Snap).await;
if let Some(nodes) = peer
.request_state_trienodes(state_root, paths.clone())
.await
Expand Down Expand Up @@ -661,7 +672,7 @@ async fn heal_storage_batch(
store: Store,
) -> Result<BTreeMap<H256, Vec<Nibbles>>, SyncError> {
for _ in 0..MAX_RETRIES {
let peer = peers.lock().await.get_peer_channels().await;
let peer = peers.lock().await.get_peer_channels(Capability::Snap).await;
if let Some(mut nodes) = peer
.request_storage_trienodes(state_root, batch.clone())
.await
Expand Down

0 comments on commit 2c98828

Please sign in to comment.