From 2cd4d6c78afa606b0f67f9c019a203d7d712730c Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 25 Mar 2024 16:11:12 +0100 Subject: [PATCH 01/17] Interface between signed and unsigned node record types --- crates/primitives/src/lib.rs | 5 ++-- crates/primitives/src/net.rs | 42 ++++++++++++++++++++++++++++++-- crates/rpc/rpc-types/Cargo.toml | 1 + crates/rpc/rpc-types/src/net.rs | 34 +++++++++++++++++++++++--- crates/rpc/rpc-types/src/peer.rs | 5 ++++ 5 files changed, 80 insertions(+), 7 deletions(-) diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 8d9793a7896d..f98513278b20 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -69,8 +69,9 @@ pub use header::{Header, HeaderValidationError, HeadersDirection, SealedHeader}; pub use integer_list::IntegerList; pub use log::{logs_bloom, Log}; pub use net::{ - goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, sepolia_nodes, NodeRecord, - GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, SEPOLIA_BOOTNODES, + goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, pk_to_id, sepolia_nodes, NodeRecord, + NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, + SEPOLIA_BOOTNODES, }; pub use peer::{AnyNode, PeerId, WithPeerId}; pub use prune::{ diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 7d122f71a18f..6afac2bc3c6e 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -1,4 +1,4 @@ -pub use reth_rpc_types::NodeRecord; +pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; // @@ -73,10 +73,15 @@ mod tests { str::FromStr, }; + use crate::MAINNET; + use super::*; - use alloy_rlp::Decodable; + use alloy_rlp::{Decodable, Encodable}; + use enr::Enr; use rand::{thread_rng, Rng, RngCore}; + use reth_ethereum_forks::Hardfork; use reth_rpc_types::PeerId; + use secp256k1::SecretKey; #[test] fn test_mapped_ipv6() { @@ -197,4 +202,37 @@ mod tests { id: PeerId::from_str("6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0").unwrap(), }) } + + #[test] + fn conversion_to_node_record_from_enr() { + const IP: &'static str = "::"; + const TCP_PORT: u16 = 30303; + const UDP_PORT: u16 = 9000; + + let mut rng = thread_rng(); + let key = SecretKey::new(&mut rng); + + let mut buf = Vec::new(); + let fork_id = MAINNET.hardfork_fork_id(Hardfork::Frontier); + fork_id.unwrap().encode(&mut buf); + + let enr = Enr::builder() + .ip6(IP.parse().unwrap()) + .udp6(UDP_PORT) + .tcp6(TCP_PORT) + .build(&key) + .unwrap(); + + let node_record_with_fork_id = NodeRecord::try_from(enr.clone()).unwrap(); + + assert_eq!( + NodeRecord { + address: IP.parse().unwrap(), + tcp_port: TCP_PORT, + udp_port: UDP_PORT, + id: pk_to_id(&enr.public_key()) + }, + node_record_with_fork_id + ) + } } diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index 4da62f7caa5f..424600e50e1c 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -21,6 +21,7 @@ alloy-rpc-engine-types = { workspace = true, features = ["jsonrpsee-types"] } ethereum_ssz_derive = { version = "0.5", optional = true } ethereum_ssz = { version = "0.5", optional = true } alloy-genesis.workspace = true +enr = { workspace = true, features = ["serde", "rust-secp256k1"] } # misc thiserror.workspace = true diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index 885920162075..99cb134bb2af 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -1,5 +1,6 @@ -use crate::PeerId; +use crate::{pk_to_id, PeerId}; use alloy_rlp::{RlpDecodable, RlpEncodable}; +use enr::Enr; use secp256k1::{SecretKey, SECP256K1}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{ @@ -9,6 +10,7 @@ use std::{ num::ParseIntError, str::FromStr, }; +use thiserror::Error; use url::{Host, Url}; /// Represents a ENR in discovery. @@ -114,8 +116,8 @@ impl fmt::Display for NodeRecord { } } -/// Possible error types when parsing a `NodeRecord` -#[derive(Debug, thiserror::Error)] +/// Possible error types when parsing a [`NodeRecord`] +#[derive(Debug, Error)] pub enum NodeRecordParseError { /// Invalid url #[error("Failed to parse url: {0}")] @@ -126,6 +128,9 @@ pub enum NodeRecordParseError { /// Invalid discport #[error("Failed to discport query: {0}")] Discport(ParseIntError), + /// Conversion from type [`Enr`] failed. + #[error("failed to convert enr into dns node record, enr: {0}")] + ConversionFromEnrFailed(Enr), } impl FromStr for NodeRecord { @@ -165,6 +170,29 @@ impl FromStr for NodeRecord { } } +impl TryFrom> for NodeRecord { + type Error = NodeRecordParseError; + + fn try_from(enr: Enr) -> Result { + let Some(address) = enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from)) + else { + return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + }; + + let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { + return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + }; + + let Some(udp_port) = enr.udp4().or_else(|| enr.udp6()) else { + return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + }; + + let id = pk_to_id(&enr.public_key()); + + Ok(NodeRecord { address, tcp_port, udp_port, id }.into_ipv4_mapped()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/rpc/rpc-types/src/peer.rs b/crates/rpc/rpc-types/src/peer.rs index a07e61d00285..44dbe5d71f24 100644 --- a/crates/rpc/rpc-types/src/peer.rs +++ b/crates/rpc/rpc-types/src/peer.rs @@ -2,3 +2,8 @@ use alloy_primitives::B512; /// Alias for a peer identifier pub type PeerId = B512; + +/// Converts a [`secp256k1::PublicKey`] to a [`PeerId`]. +pub fn pk_to_id(pk: &secp256k1::PublicKey) -> PeerId { + PeerId::from_slice(&pk.serialize_uncompressed()[1..]) +} From ebdc17fcb779a2ea3facd190593c61b2881f202b Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 25 Mar 2024 16:12:03 +0100 Subject: [PATCH 02/17] Make dns generic over node record type --- Cargo.lock | 2 + crates/net/dns/Cargo.toml | 4 +- crates/net/dns/src/lib.rs | 290 ++++++++++++++++++++-------- crates/net/network/src/discovery.rs | 4 +- 4 files changed, 220 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 383664d684bc..fe091fb455d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5713,6 +5713,7 @@ dependencies = [ "alloy-rlp", "data-encoding", "enr", + "futures", "linked_hash_set", "parking_lot 0.12.1", "reth-net-common", @@ -6553,6 +6554,7 @@ dependencies = [ "alloy-rpc-types", "arbitrary", "bytes", + "enr", "ethereum_ssz", "ethereum_ssz_derive", "jsonrpsee-types", diff --git a/crates/net/dns/Cargo.toml b/crates/net/dns/Cargo.toml index 0a9af063c467..48a3d05f24dd 100644 --- a/crates/net/dns/Cargo.toml +++ b/crates/net/dns/Cargo.toml @@ -17,11 +17,12 @@ reth-primitives.workspace = true reth-net-common.workspace = true # ethereum -alloy-rlp.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery", "serde"] } enr = { workspace = true, default-features = false, features = ["rust-secp256k1"] } +alloy-rlp.workspace = true # async/futures +futures.workspace = true tokio = { workspace = true, features = ["io-util", "net", "time"] } tokio-stream.workspace = true @@ -42,6 +43,7 @@ serde_with = { version = "3.3.0", optional = true } tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] } reth-tracing.workspace = true alloy-chains.workspace = true +alloy-rlp.workspace = true [features] default = ["serde"] diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index ceff42c25144..221580bf68fd 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -13,24 +13,26 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; +pub use crate::resolver::{DnsResolver, MapResolver, ResolveError, Resolver}; use crate::{ query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult}, sync::{ResolveKind, SyncAction}, tree::{DnsEntry, LinkEntry}, }; +use alloy_rlp::Decodable; pub use config::DnsDiscoveryConfig; use enr::Enr; use error::ParseDnsEntryError; -use reth_primitives::{ForkId, NodeRecord, PeerId}; +use futures::StreamExt; +use reth_primitives::{ForkId, NodeRecord, NodeRecordParseError}; use schnellru::{ByLength, LruMap}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - net::IpAddr, + fmt, pin::Pin, sync::Arc, - task::{ready, Context, Poll}, + task::{Context, Poll}, time::{Duration, Instant}, }; use sync::SyncTree; @@ -44,7 +46,7 @@ use tokio::{ }; use tokio_stream::{ wrappers::{ReceiverStream, UnboundedReceiverStream}, - Stream, StreamExt, + Stream, }; use tracing::{debug, trace}; @@ -55,16 +57,44 @@ pub mod resolver; mod sync; pub mod tree; +/// Starts DNS discovery using resolver type [`DnsResolver`]. On success, returns a +/// [`DnsDiscoveryHandle`] to interact with [`DnsDiscoveryService`], a stream for discovered nodes +/// [`ReceiverStream`], and the join handle for [`DnsDiscoveryService`]. +#[allow(clippy::type_complexity)] +pub fn new_with_dns_resolver( + dns_config: DnsDiscoveryConfig, +) -> Result< + ( + Option>, + Option>>, + Option>, + ), + ResolveError, +> +where + N: Clone + Send + 'static, + DnsDiscoveryService: Update, + DnsDiscoveryService: Stream, + as Stream>::Item: fmt::Debug, +{ + let (mut service, dns_disc) = + DnsDiscoveryService::new_pair(Arc::new(DnsResolver::from_system_conf()?), dns_config); + let dns_discovery_updates = service.node_record_stream(); + let dns_disc_service = service.spawn(); + + Ok((Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))) +} + /// [DnsDiscoveryService] front-end. #[derive(Clone, Debug)] -pub struct DnsDiscoveryHandle { +pub struct DnsDiscoveryHandle { /// Channel for sending commands to the service. - to_service: UnboundedSender, + to_service: UnboundedSender>, } // === impl DnsDiscovery === -impl DnsDiscoveryHandle { +impl DnsDiscoveryHandle { /// Starts syncing the given link to a tree. pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> { self.sync_tree_with_link(link.parse()?); @@ -79,7 +109,7 @@ impl DnsDiscoveryHandle { /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s. pub async fn node_record_stream( &self, - ) -> Result, oneshot::error::RecvError> { + ) -> Result>, oneshot::error::RecvError> { let (tx, rx) = oneshot::channel(); let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx); let _ = self.to_service.send(cmd); @@ -90,13 +120,13 @@ impl DnsDiscoveryHandle { /// A client that discovers nodes via DNS. #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] -pub struct DnsDiscoveryService { +pub struct DnsDiscoveryService { /// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand. - command_tx: UnboundedSender, + command_tx: UnboundedSender>, /// Receiver half of the command channel. - command_rx: UnboundedReceiverStream, - /// All subscribers for resolved [NodeRecord]s. - node_record_listeners: Vec>, + command_rx: UnboundedReceiverStream>, + /// All subscribers for resolved node records. + node_record_listeners: Vec>>, /// All the trees that can be synced. trees: HashMap, /// All queries currently in progress @@ -113,14 +143,15 @@ pub struct DnsDiscoveryService { // === impl DnsDiscoveryService === -impl DnsDiscoveryService { +impl DnsDiscoveryService { /// Creates a new instance of the [DnsDiscoveryService] using the given settings. /// /// ``` /// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver}; + /// use reth_primitives::NodeRecord; /// use std::sync::Arc; /// # fn t() { - /// let service = DnsDiscoveryService::new( + /// let service: DnsDiscoveryService = DnsDiscoveryService::new( /// Arc::new(DnsResolver::from_system_conf().unwrap()), /// Default::default(), /// ); @@ -152,7 +183,12 @@ impl DnsDiscoveryService { /// Spawns this services onto a new task /// /// Note: requires a running runtime - pub fn spawn(mut self) -> JoinHandle<()> { + pub fn spawn(mut self) -> JoinHandle<()> + where + Self: Stream, + N: Clone + Send + 'static, + I: fmt::Debug, + { tokio::task::spawn(async move { self.bootstrap(); @@ -171,19 +207,19 @@ impl DnsDiscoveryService { /// Same as [DnsDiscoveryService::new] but also returns a new handle that's connected to the /// service - pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { + pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { let service = Self::new(resolver, config); let handle = service.handle(); (service, handle) } /// Returns a new [`DnsDiscoveryHandle`] that can send commands to this type. - pub fn handle(&self) -> DnsDiscoveryHandle { + pub fn handle(&self) -> DnsDiscoveryHandle { DnsDiscoveryHandle { to_service: self.command_tx.clone() } } /// Creates a new channel for [`NodeRecord`]s. - pub fn node_record_stream(&mut self) -> ReceiverStream { + pub fn node_record_stream(&mut self) -> ReceiverStream> { let (tx, rx) = mpsc::channel(256); self.node_record_listeners.push(tx); ReceiverStream::new(rx) @@ -192,7 +228,10 @@ impl DnsDiscoveryService { /// Sends the event to all listeners. /// /// Remove channels that got closed. - fn notify(&mut self, record: DnsNodeRecordUpdate) { + fn notify(&mut self, record: DnsNodeRecordUpdate) + where + N: Clone, + { self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) { Ok(()) => true, Err(err) => match err { @@ -214,14 +253,25 @@ impl DnsDiscoveryService { } /// Resolves an entry - fn resolve_entry(&mut self, link: LinkEntry, hash: String, kind: ResolveKind) { + fn resolve_entry( + &mut self, + link: LinkEntry, + hash: String, + kind: ResolveKind, + ) -> Result<(), DnsNodeRecordUpdateParseError> + where + Self: Update, + { if let Some(entry) = self.dns_record_cache.get(&hash).cloned() { // already resolved let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind }; - self.on_resolved_entry(cached); - return + self.on_resolved_entry(cached)?; + + return Ok(()) } - self.queries.resolve_entry(link, hash, kind) + self.queries.resolve_entry(link, hash, kind); + + Ok(()) } fn on_resolved_root(&mut self, resp: ResolveRootResult) { @@ -240,14 +290,13 @@ impl DnsDiscoveryService { } } - fn on_resolved_enr(&mut self, enr: Enr) { - if let Some(record) = convert_enr_node_record(&enr) { - self.notify(record); - } - self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) - } - - fn on_resolved_entry(&mut self, resp: ResolveEntryResult) { + fn on_resolved_entry( + &mut self, + resp: ResolveEntryResult, + ) -> Result<(), DnsNodeRecordUpdateParseError> + where + Self: Update, + { let ResolveEntryResult { entry, link, hash, kind } = resp; match entry { @@ -284,20 +333,30 @@ impl DnsDiscoveryService { if kind.is_link() { debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry"); } else { - self.on_resolved_enr(entry.enr) + self.on_resolved_enr(entry.enr)? } } } } } + + Ok(()) } +} + +/// A Stream events, mainly used for debugging +impl Stream for DnsDiscoveryService +where + Self: Update, +{ + type Item = DnsDiscoveryEvent; /// Advances the state of the DNS discovery service by polling,triggering lookups - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // drain buffered events first if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event) + return Poll::Ready(Some(event)) } // process all incoming commands @@ -316,7 +375,14 @@ impl DnsDiscoveryService { // handle query outcome match outcome { QueryOutcome::Root(resp) => self.on_resolved_root(resp), - QueryOutcome::Entry(resp) => self.on_resolved_entry(resp), + QueryOutcome::Entry(resp) => { + if let Err(err) = self.on_resolved_entry(resp) { + debug!(target: "net::dns", + err=%err, + "failed to resolve entry" + ); + } + } } } @@ -324,8 +390,9 @@ impl DnsDiscoveryService { let now = Instant::now(); let mut pending_resolves = Vec::new(); let mut pending_updates = Vec::new(); + let recheck_interval = self.recheck_interval; for tree in self.trees.values_mut() { - while let Some(action) = tree.poll(now, self.recheck_interval) { + while let Some(action) = tree.poll(now, recheck_interval) { progress = true; match action { SyncAction::UpdateRoot => { @@ -342,7 +409,12 @@ impl DnsDiscoveryService { } for (domain, hash, kind) in pending_resolves { - self.resolve_entry(domain, hash, kind) + if let Err(err) = self.resolve_entry(domain, hash, kind) { + debug!(target: "net::dns", + err=%err, + "failed to resolve entry" + ); + } } for link in pending_updates { @@ -356,29 +428,46 @@ impl DnsDiscoveryService { } } -/// A Stream events, mainly used for debugging -impl Stream for DnsDiscoveryService { - type Item = DnsDiscoveryEvent; +/// Trait for converting an [`Enr`] into an update and sending the update. Trait is implemented +/// for all supported update types. +pub trait Update { + /// Tries to convert a resolved [`Enr`] into a [`DnsNodeRecordUpdate`]. Notifies all listeners + /// of update upon successful conversion. + fn on_resolved_enr(&mut self, enr: Enr) + -> Result<(), DnsNodeRecordUpdateParseError>; +} + +impl Update for DnsDiscoveryService { + fn on_resolved_enr( + &mut self, + enr: Enr, + ) -> Result<(), DnsNodeRecordUpdateParseError> { + self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr.clone())); + let update = enr.try_into()?; + self.notify(update); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Some(ready!(self.get_mut().poll(cx)))) + Ok(()) } } -/// The converted discovered [Enr] object -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct DnsNodeRecordUpdate { - /// Discovered node and it's addresses - pub node_record: NodeRecord, - /// The forkid of the node, if present in the ENR - pub fork_id: Option, +impl Update for DnsDiscoveryService> { + fn on_resolved_enr( + &mut self, + enr: Enr, + ) -> Result<(), DnsNodeRecordUpdateParseError> { + self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr.clone())); + let update = enr.try_into()?; + self.notify(update); + + Ok(()) + } } /// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService] -enum DnsDiscoveryCommand { +enum DnsDiscoveryCommand { /// Sync a tree SyncTree(LinkEntry), - NodeRecordUpdates(oneshot::Sender>), + NodeRecordUpdates(oneshot::Sender>>), } /// Represents dns discovery related update events. @@ -388,22 +477,66 @@ pub enum DnsDiscoveryEvent { Enr(Enr), } -/// Converts an [Enr] into a [NodeRecord] -fn convert_enr_node_record(enr: &Enr) -> Option { - use alloy_rlp::Decodable; +/// Conversion from [`Enr`] to [`DnsNodeRecordUpdate`] failed. +#[derive(Debug, thiserror::Error)] +pub enum DnsNodeRecordUpdateParseError { + /// Missing key used to identify an execution layer enr. + #[error("fork id missing on enr, 'eth' key missing")] + ForkIdMissing, + /// Failed to decode fork ID rlp value. + #[error("failed to decode fork id, 'eth': {0:?}")] + ForkIdDecodeError(Vec), + /// Conversion from [`Enr`] into [`NodeRecord`] failed. + #[error(transparent)] + NodeRecordParseError(#[from] NodeRecordParseError), +} - let node_record = NodeRecord { - address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, - tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, - udp_port: enr.udp4().or_else(|| enr.udp6())?, - id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), +// todo: remove type by adding `fork_id` field to NodeRecord +/// The converted discovered [Enr] object +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DnsNodeRecordUpdate { + /// Discovered node and it's addresses + pub node_record: N, + /// The for kid of the node, if present in the ENR + pub fork_id: Option, +} + +impl TryFrom> for DnsNodeRecordUpdate { + type Error = DnsNodeRecordUpdateParseError; + + fn try_from(enr: Enr) -> Result { + // fork id is how we know this is an EL node, this isn't spec´d out but by precedent + let fork_id = get_fork_id(&enr).ok(); + + let node_record = NodeRecord::try_from(enr)?; + + Ok(DnsNodeRecordUpdate { node_record, fork_id }) } - .into_ipv4_mapped(); +} + +impl TryFrom> for DnsNodeRecordUpdate> { + type Error = DnsNodeRecordUpdateParseError; + + fn try_from(enr: Enr) -> Result { + // fork id is how we know this is an EL node, this isn't spec´d out but by precedent + let fork_id = get_fork_id(&enr)?; + + Ok(DnsNodeRecordUpdate { node_record: enr, fork_id: Some(fork_id) }) + } +} + +/// Tries to read the [`ForkId`] from given [`Enr`]. +// todo: enable for all chains not only "eth" +pub fn get_fork_id(enr: &Enr) -> Result { + let Some(mut maybe_fork_id) = enr.get(b"eth") else { + return Err(DnsNodeRecordUpdateParseError::ForkIdMissing) + }; - let mut maybe_fork_id = enr.get(b"eth")?; - let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); + let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { + return Err(DnsNodeRecordUpdateParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) + }; - Some(DnsNodeRecordUpdate { node_record, fork_id }) + Ok(fork_id) } #[cfg(test)] @@ -431,12 +564,13 @@ mod tests { LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); - let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + let mut service: DnsDiscoveryService = + DnsDiscoveryService::new(Arc::new(resolver), Default::default()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { - let _ = service.poll(cx); + let _ = service.poll_next_unpin(cx); Poll::Ready(()) }) .await; @@ -468,7 +602,8 @@ mod tests { resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); - let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + let mut service: DnsDiscoveryService = + DnsDiscoveryService::new(Arc::new(resolver), Default::default()); let mut node_records = service.node_record_stream(); @@ -479,16 +614,16 @@ mod tests { service.sync_tree_with_link(link.clone()); - let event = poll_fn(|cx| service.poll(cx)).await; + let event = poll_fn(|cx| service.poll_next_unpin(cx)).await; - match event { + match event.unwrap() { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { - assert!(service.poll(cx).is_pending()); + assert!(service.poll_next_unpin(cx).is_pending()); Poll::Ready(()) }) .await; @@ -515,12 +650,13 @@ mod tests { LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); - let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); + let mut service: DnsDiscoveryService = + DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { - assert!(service.poll(cx).is_pending()); + assert!(service.poll_next_unpin(cx).is_pending()); Poll::Ready(()) }) .await; @@ -531,16 +667,16 @@ mod tests { let enr = Enr::empty(&secret_key).unwrap(); resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); - let event = poll_fn(|cx| service.poll(cx)).await; + let event = poll_fn(|cx| service.poll_next_unpin(cx)).await; - match event { + match event.unwrap() { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { - assert!(service.poll(cx).is_pending()); + assert!(service.poll_next_unpin(cx).is_pending()); Poll::Ready(()) }) .await; @@ -551,7 +687,7 @@ mod tests { async fn test_dns_resolver() { reth_tracing::init_test_tracing(); - let mut service = DnsDiscoveryService::new( + let mut service: DnsDiscoveryService = DnsDiscoveryService::new( Arc::new(DnsResolver::from_system_conf().unwrap()), Default::default(), ); diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 56aa6c68d9f5..036b52e3ae60 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -26,7 +26,7 @@ use tokio_stream::{wrappers::ReceiverStream, Stream}; /// Listens for new discovered nodes and emits events for discovered nodes and their /// address. #[derive(Debug)] -pub struct Discovery { +pub struct Discovery { /// All nodes discovered via discovery protocol. /// /// These nodes can be ephemeral and are updated via the discovery protocol. @@ -40,7 +40,7 @@ pub struct Discovery { /// The handle to the spawned discv4 service _discv4_service: Option>, /// Handler to interact with the DNS discovery service - _dns_discovery: Option, + _dns_discovery: Option>, /// Updates from the DNS discovery service. dns_discovery_updates: Option>, /// The handle to the spawned DNS discovery service From 0beb236fe928350920473aca95cd3aa8644910e7 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 25 Mar 2024 16:43:15 +0100 Subject: [PATCH 03/17] Fix lint --- crates/primitives/src/net.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 6afac2bc3c6e..948183966047 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -205,7 +205,7 @@ mod tests { #[test] fn conversion_to_node_record_from_enr() { - const IP: &'static str = "::"; + const IP: &str = "::"; const TCP_PORT: u16 = 30303; const UDP_PORT: u16 = 9000; From 8ab3f4b2d8c9a6cd665a3ca1203e4044e6664314 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 26 Mar 2024 23:23:44 +0100 Subject: [PATCH 04/17] Checkout dns crate from main --- Cargo.lock | 1 - crates/net/dns/Cargo.toml | 4 +- crates/net/dns/src/lib.rs | 290 ++++++++-------------------- crates/net/network/src/discovery.rs | 4 +- 4 files changed, 80 insertions(+), 219 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe091fb455d9..0e71c8183a2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5713,7 +5713,6 @@ dependencies = [ "alloy-rlp", "data-encoding", "enr", - "futures", "linked_hash_set", "parking_lot 0.12.1", "reth-net-common", diff --git a/crates/net/dns/Cargo.toml b/crates/net/dns/Cargo.toml index 48a3d05f24dd..0a9af063c467 100644 --- a/crates/net/dns/Cargo.toml +++ b/crates/net/dns/Cargo.toml @@ -17,12 +17,11 @@ reth-primitives.workspace = true reth-net-common.workspace = true # ethereum +alloy-rlp.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery", "serde"] } enr = { workspace = true, default-features = false, features = ["rust-secp256k1"] } -alloy-rlp.workspace = true # async/futures -futures.workspace = true tokio = { workspace = true, features = ["io-util", "net", "time"] } tokio-stream.workspace = true @@ -43,7 +42,6 @@ serde_with = { version = "3.3.0", optional = true } tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] } reth-tracing.workspace = true alloy-chains.workspace = true -alloy-rlp.workspace = true [features] default = ["serde"] diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index 221580bf68fd..ceff42c25144 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -13,26 +13,24 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -pub use crate::resolver::{DnsResolver, MapResolver, ResolveError, Resolver}; +pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; use crate::{ query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult}, sync::{ResolveKind, SyncAction}, tree::{DnsEntry, LinkEntry}, }; -use alloy_rlp::Decodable; pub use config::DnsDiscoveryConfig; use enr::Enr; use error::ParseDnsEntryError; -use futures::StreamExt; -use reth_primitives::{ForkId, NodeRecord, NodeRecordParseError}; +use reth_primitives::{ForkId, NodeRecord, PeerId}; use schnellru::{ByLength, LruMap}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt, + net::IpAddr, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::{ready, Context, Poll}, time::{Duration, Instant}, }; use sync::SyncTree; @@ -46,7 +44,7 @@ use tokio::{ }; use tokio_stream::{ wrappers::{ReceiverStream, UnboundedReceiverStream}, - Stream, + Stream, StreamExt, }; use tracing::{debug, trace}; @@ -57,44 +55,16 @@ pub mod resolver; mod sync; pub mod tree; -/// Starts DNS discovery using resolver type [`DnsResolver`]. On success, returns a -/// [`DnsDiscoveryHandle`] to interact with [`DnsDiscoveryService`], a stream for discovered nodes -/// [`ReceiverStream`], and the join handle for [`DnsDiscoveryService`]. -#[allow(clippy::type_complexity)] -pub fn new_with_dns_resolver( - dns_config: DnsDiscoveryConfig, -) -> Result< - ( - Option>, - Option>>, - Option>, - ), - ResolveError, -> -where - N: Clone + Send + 'static, - DnsDiscoveryService: Update, - DnsDiscoveryService: Stream, - as Stream>::Item: fmt::Debug, -{ - let (mut service, dns_disc) = - DnsDiscoveryService::new_pair(Arc::new(DnsResolver::from_system_conf()?), dns_config); - let dns_discovery_updates = service.node_record_stream(); - let dns_disc_service = service.spawn(); - - Ok((Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))) -} - /// [DnsDiscoveryService] front-end. #[derive(Clone, Debug)] -pub struct DnsDiscoveryHandle { +pub struct DnsDiscoveryHandle { /// Channel for sending commands to the service. - to_service: UnboundedSender>, + to_service: UnboundedSender, } // === impl DnsDiscovery === -impl DnsDiscoveryHandle { +impl DnsDiscoveryHandle { /// Starts syncing the given link to a tree. pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> { self.sync_tree_with_link(link.parse()?); @@ -109,7 +79,7 @@ impl DnsDiscoveryHandle { /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s. pub async fn node_record_stream( &self, - ) -> Result>, oneshot::error::RecvError> { + ) -> Result, oneshot::error::RecvError> { let (tx, rx) = oneshot::channel(); let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx); let _ = self.to_service.send(cmd); @@ -120,13 +90,13 @@ impl DnsDiscoveryHandle { /// A client that discovers nodes via DNS. #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] -pub struct DnsDiscoveryService { +pub struct DnsDiscoveryService { /// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand. - command_tx: UnboundedSender>, + command_tx: UnboundedSender, /// Receiver half of the command channel. - command_rx: UnboundedReceiverStream>, - /// All subscribers for resolved node records. - node_record_listeners: Vec>>, + command_rx: UnboundedReceiverStream, + /// All subscribers for resolved [NodeRecord]s. + node_record_listeners: Vec>, /// All the trees that can be synced. trees: HashMap, /// All queries currently in progress @@ -143,15 +113,14 @@ pub struct DnsDiscoveryService { // === impl DnsDiscoveryService === -impl DnsDiscoveryService { +impl DnsDiscoveryService { /// Creates a new instance of the [DnsDiscoveryService] using the given settings. /// /// ``` /// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver}; - /// use reth_primitives::NodeRecord; /// use std::sync::Arc; /// # fn t() { - /// let service: DnsDiscoveryService = DnsDiscoveryService::new( + /// let service = DnsDiscoveryService::new( /// Arc::new(DnsResolver::from_system_conf().unwrap()), /// Default::default(), /// ); @@ -183,12 +152,7 @@ impl DnsDiscoveryService { /// Spawns this services onto a new task /// /// Note: requires a running runtime - pub fn spawn(mut self) -> JoinHandle<()> - where - Self: Stream, - N: Clone + Send + 'static, - I: fmt::Debug, - { + pub fn spawn(mut self) -> JoinHandle<()> { tokio::task::spawn(async move { self.bootstrap(); @@ -207,19 +171,19 @@ impl DnsDiscoveryService { /// Same as [DnsDiscoveryService::new] but also returns a new handle that's connected to the /// service - pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { + pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { let service = Self::new(resolver, config); let handle = service.handle(); (service, handle) } /// Returns a new [`DnsDiscoveryHandle`] that can send commands to this type. - pub fn handle(&self) -> DnsDiscoveryHandle { + pub fn handle(&self) -> DnsDiscoveryHandle { DnsDiscoveryHandle { to_service: self.command_tx.clone() } } /// Creates a new channel for [`NodeRecord`]s. - pub fn node_record_stream(&mut self) -> ReceiverStream> { + pub fn node_record_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(256); self.node_record_listeners.push(tx); ReceiverStream::new(rx) @@ -228,10 +192,7 @@ impl DnsDiscoveryService { /// Sends the event to all listeners. /// /// Remove channels that got closed. - fn notify(&mut self, record: DnsNodeRecordUpdate) - where - N: Clone, - { + fn notify(&mut self, record: DnsNodeRecordUpdate) { self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) { Ok(()) => true, Err(err) => match err { @@ -253,25 +214,14 @@ impl DnsDiscoveryService { } /// Resolves an entry - fn resolve_entry( - &mut self, - link: LinkEntry, - hash: String, - kind: ResolveKind, - ) -> Result<(), DnsNodeRecordUpdateParseError> - where - Self: Update, - { + fn resolve_entry(&mut self, link: LinkEntry, hash: String, kind: ResolveKind) { if let Some(entry) = self.dns_record_cache.get(&hash).cloned() { // already resolved let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind }; - self.on_resolved_entry(cached)?; - - return Ok(()) + self.on_resolved_entry(cached); + return } - self.queries.resolve_entry(link, hash, kind); - - Ok(()) + self.queries.resolve_entry(link, hash, kind) } fn on_resolved_root(&mut self, resp: ResolveRootResult) { @@ -290,13 +240,14 @@ impl DnsDiscoveryService { } } - fn on_resolved_entry( - &mut self, - resp: ResolveEntryResult, - ) -> Result<(), DnsNodeRecordUpdateParseError> - where - Self: Update, - { + fn on_resolved_enr(&mut self, enr: Enr) { + if let Some(record) = convert_enr_node_record(&enr) { + self.notify(record); + } + self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) + } + + fn on_resolved_entry(&mut self, resp: ResolveEntryResult) { let ResolveEntryResult { entry, link, hash, kind } = resp; match entry { @@ -333,30 +284,20 @@ impl DnsDiscoveryService { if kind.is_link() { debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry"); } else { - self.on_resolved_enr(entry.enr)? + self.on_resolved_enr(entry.enr) } } } } } - - Ok(()) } -} - -/// A Stream events, mainly used for debugging -impl Stream for DnsDiscoveryService -where - Self: Update, -{ - type Item = DnsDiscoveryEvent; /// Advances the state of the DNS discovery service by polling,triggering lookups - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { loop { // drain buffered events first if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(Some(event)) + return Poll::Ready(event) } // process all incoming commands @@ -375,14 +316,7 @@ where // handle query outcome match outcome { QueryOutcome::Root(resp) => self.on_resolved_root(resp), - QueryOutcome::Entry(resp) => { - if let Err(err) = self.on_resolved_entry(resp) { - debug!(target: "net::dns", - err=%err, - "failed to resolve entry" - ); - } - } + QueryOutcome::Entry(resp) => self.on_resolved_entry(resp), } } @@ -390,9 +324,8 @@ where let now = Instant::now(); let mut pending_resolves = Vec::new(); let mut pending_updates = Vec::new(); - let recheck_interval = self.recheck_interval; for tree in self.trees.values_mut() { - while let Some(action) = tree.poll(now, recheck_interval) { + while let Some(action) = tree.poll(now, self.recheck_interval) { progress = true; match action { SyncAction::UpdateRoot => { @@ -409,12 +342,7 @@ where } for (domain, hash, kind) in pending_resolves { - if let Err(err) = self.resolve_entry(domain, hash, kind) { - debug!(target: "net::dns", - err=%err, - "failed to resolve entry" - ); - } + self.resolve_entry(domain, hash, kind) } for link in pending_updates { @@ -428,46 +356,29 @@ where } } -/// Trait for converting an [`Enr`] into an update and sending the update. Trait is implemented -/// for all supported update types. -pub trait Update { - /// Tries to convert a resolved [`Enr`] into a [`DnsNodeRecordUpdate`]. Notifies all listeners - /// of update upon successful conversion. - fn on_resolved_enr(&mut self, enr: Enr) - -> Result<(), DnsNodeRecordUpdateParseError>; -} - -impl Update for DnsDiscoveryService { - fn on_resolved_enr( - &mut self, - enr: Enr, - ) -> Result<(), DnsNodeRecordUpdateParseError> { - self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr.clone())); - let update = enr.try_into()?; - self.notify(update); +/// A Stream events, mainly used for debugging +impl Stream for DnsDiscoveryService { + type Item = DnsDiscoveryEvent; - Ok(()) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Some(ready!(self.get_mut().poll(cx)))) } } -impl Update for DnsDiscoveryService> { - fn on_resolved_enr( - &mut self, - enr: Enr, - ) -> Result<(), DnsNodeRecordUpdateParseError> { - self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr.clone())); - let update = enr.try_into()?; - self.notify(update); - - Ok(()) - } +/// The converted discovered [Enr] object +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DnsNodeRecordUpdate { + /// Discovered node and it's addresses + pub node_record: NodeRecord, + /// The forkid of the node, if present in the ENR + pub fork_id: Option, } /// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService] -enum DnsDiscoveryCommand { +enum DnsDiscoveryCommand { /// Sync a tree SyncTree(LinkEntry), - NodeRecordUpdates(oneshot::Sender>>), + NodeRecordUpdates(oneshot::Sender>), } /// Represents dns discovery related update events. @@ -477,66 +388,22 @@ pub enum DnsDiscoveryEvent { Enr(Enr), } -/// Conversion from [`Enr`] to [`DnsNodeRecordUpdate`] failed. -#[derive(Debug, thiserror::Error)] -pub enum DnsNodeRecordUpdateParseError { - /// Missing key used to identify an execution layer enr. - #[error("fork id missing on enr, 'eth' key missing")] - ForkIdMissing, - /// Failed to decode fork ID rlp value. - #[error("failed to decode fork id, 'eth': {0:?}")] - ForkIdDecodeError(Vec), - /// Conversion from [`Enr`] into [`NodeRecord`] failed. - #[error(transparent)] - NodeRecordParseError(#[from] NodeRecordParseError), -} +/// Converts an [Enr] into a [NodeRecord] +fn convert_enr_node_record(enr: &Enr) -> Option { + use alloy_rlp::Decodable; -// todo: remove type by adding `fork_id` field to NodeRecord -/// The converted discovered [Enr] object -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct DnsNodeRecordUpdate { - /// Discovered node and it's addresses - pub node_record: N, - /// The for kid of the node, if present in the ENR - pub fork_id: Option, -} - -impl TryFrom> for DnsNodeRecordUpdate { - type Error = DnsNodeRecordUpdateParseError; - - fn try_from(enr: Enr) -> Result { - // fork id is how we know this is an EL node, this isn't spec´d out but by precedent - let fork_id = get_fork_id(&enr).ok(); - - let node_record = NodeRecord::try_from(enr)?; - - Ok(DnsNodeRecordUpdate { node_record, fork_id }) + let node_record = NodeRecord { + address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, + tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, + udp_port: enr.udp4().or_else(|| enr.udp6())?, + id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), } -} - -impl TryFrom> for DnsNodeRecordUpdate> { - type Error = DnsNodeRecordUpdateParseError; - - fn try_from(enr: Enr) -> Result { - // fork id is how we know this is an EL node, this isn't spec´d out but by precedent - let fork_id = get_fork_id(&enr)?; - - Ok(DnsNodeRecordUpdate { node_record: enr, fork_id: Some(fork_id) }) - } -} - -/// Tries to read the [`ForkId`] from given [`Enr`]. -// todo: enable for all chains not only "eth" -pub fn get_fork_id(enr: &Enr) -> Result { - let Some(mut maybe_fork_id) = enr.get(b"eth") else { - return Err(DnsNodeRecordUpdateParseError::ForkIdMissing) - }; + .into_ipv4_mapped(); - let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { - return Err(DnsNodeRecordUpdateParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) - }; + let mut maybe_fork_id = enr.get(b"eth")?; + let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); - Ok(fork_id) + Some(DnsNodeRecordUpdate { node_record, fork_id }) } #[cfg(test)] @@ -564,13 +431,12 @@ mod tests { LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); - let mut service: DnsDiscoveryService = - DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { - let _ = service.poll_next_unpin(cx); + let _ = service.poll(cx); Poll::Ready(()) }) .await; @@ -602,8 +468,7 @@ mod tests { resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); - let mut service: DnsDiscoveryService = - DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); let mut node_records = service.node_record_stream(); @@ -614,16 +479,16 @@ mod tests { service.sync_tree_with_link(link.clone()); - let event = poll_fn(|cx| service.poll_next_unpin(cx)).await; + let event = poll_fn(|cx| service.poll(cx)).await; - match event.unwrap() { + match event { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { - assert!(service.poll_next_unpin(cx).is_pending()); + assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; @@ -650,13 +515,12 @@ mod tests { LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); - let mut service: DnsDiscoveryService = - DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); + let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { - assert!(service.poll_next_unpin(cx).is_pending()); + assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; @@ -667,16 +531,16 @@ mod tests { let enr = Enr::empty(&secret_key).unwrap(); resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); - let event = poll_fn(|cx| service.poll_next_unpin(cx)).await; + let event = poll_fn(|cx| service.poll(cx)).await; - match event.unwrap() { + match event { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { - assert!(service.poll_next_unpin(cx).is_pending()); + assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; @@ -687,7 +551,7 @@ mod tests { async fn test_dns_resolver() { reth_tracing::init_test_tracing(); - let mut service: DnsDiscoveryService = DnsDiscoveryService::new( + let mut service = DnsDiscoveryService::new( Arc::new(DnsResolver::from_system_conf().unwrap()), Default::default(), ); diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 036b52e3ae60..56aa6c68d9f5 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -26,7 +26,7 @@ use tokio_stream::{wrappers::ReceiverStream, Stream}; /// Listens for new discovered nodes and emits events for discovered nodes and their /// address. #[derive(Debug)] -pub struct Discovery { +pub struct Discovery { /// All nodes discovered via discovery protocol. /// /// These nodes can be ephemeral and are updated via the discovery protocol. @@ -40,7 +40,7 @@ pub struct Discovery { /// The handle to the spawned discv4 service _discv4_service: Option>, /// Handler to interact with the DNS discovery service - _dns_discovery: Option>, + _dns_discovery: Option, /// Updates from the DNS discovery service. dns_discovery_updates: Option>, /// The handle to the spawned DNS discovery service From ab23ed2c672b9aba76c0d3d2226e08b543b3fd06 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 26 Mar 2024 23:26:43 +0100 Subject: [PATCH 05/17] Add enr field to dns update --- crates/net/dns/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index ceff42c25144..f1056d841d59 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -372,6 +372,8 @@ pub struct DnsNodeRecordUpdate { pub node_record: NodeRecord, /// The forkid of the node, if present in the ENR pub fork_id: Option, + /// Original [`Enr`]. + pub enr: Enr, } /// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService] @@ -403,7 +405,7 @@ fn convert_enr_node_record(enr: &Enr) -> Option let mut maybe_fork_id = enr.get(b"eth")?; let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); - Some(DnsNodeRecordUpdate { node_record, fork_id }) + Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() }) } #[cfg(test)] From 6eb80d25d2c53f82faa9672eb4aee4f3bd315a33 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 15:54:49 +0100 Subject: [PATCH 06/17] Clean up test --- crates/primitives/src/net.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 948183966047..47ce3247a0f3 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -223,7 +223,7 @@ mod tests { .build(&key) .unwrap(); - let node_record_with_fork_id = NodeRecord::try_from(enr.clone()).unwrap(); + let node_record = NodeRecord::try_from(enr.clone()).unwrap(); assert_eq!( NodeRecord { @@ -232,7 +232,7 @@ mod tests { udp_port: UDP_PORT, id: pk_to_id(&enr.public_key()) }, - node_record_with_fork_id + node_record ) } } From b96fb427f775c90ff5b24e1289e5dff4ee3022db Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 16:09:29 +0100 Subject: [PATCH 07/17] Use ref to enr in conversion from node record --- crates/net/dns/src/lib.rs | 24 ++++++++++++++---------- crates/primitives/src/net.rs | 2 +- crates/rpc/rpc-types/src/net.rs | 14 +++++++------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index f1056d841d59..6ad72369d3e0 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -241,7 +241,7 @@ impl DnsDiscoveryService { } fn on_resolved_enr(&mut self, enr: Enr) { - if let Some(record) = convert_enr_node_record(&enr) { + if let Some(record) = convert_enr_node_record(enr.clone()) { self.notify(record); } self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) @@ -391,21 +391,25 @@ pub enum DnsDiscoveryEvent { } /// Converts an [Enr] into a [NodeRecord] -fn convert_enr_node_record(enr: &Enr) -> Option { +fn convert_enr_node_record(enr: Enr) -> Option { use alloy_rlp::Decodable; - let node_record = NodeRecord { - address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, - tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, - udp_port: enr.udp4().or_else(|| enr.udp6())?, - id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), - } - .into_ipv4_mapped(); + let node_record = match NodeRecord::try_from(&enr) { + Ok(node_record) => node_record, + Err(err) => { + trace!(target: "disc::dns", + %err, + "can't convert enr to node_record" + ); + + return None + } + }; let mut maybe_fork_id = enr.get(b"eth")?; let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); - Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() }) + Some(DnsNodeRecordUpdate { node_record, fork_id, enr }) } #[cfg(test)] diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 47ce3247a0f3..db6560a297ae 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -223,7 +223,7 @@ mod tests { .build(&key) .unwrap(); - let node_record = NodeRecord::try_from(enr.clone()).unwrap(); + let node_record = NodeRecord::try_from(&enr).unwrap(); assert_eq!( NodeRecord { diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index 99cb134bb2af..1d61a7b524d5 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -129,8 +129,8 @@ pub enum NodeRecordParseError { #[error("Failed to discport query: {0}")] Discport(ParseIntError), /// Conversion from type [`Enr`] failed. - #[error("failed to convert enr into dns node record, enr: {0}")] - ConversionFromEnrFailed(Enr), + #[error("failed to convert enr into node record")] + ConversionFromEnrFailed, } impl FromStr for NodeRecord { @@ -170,21 +170,21 @@ impl FromStr for NodeRecord { } } -impl TryFrom> for NodeRecord { +impl TryFrom<&Enr> for NodeRecord { type Error = NodeRecordParseError; - fn try_from(enr: Enr) -> Result { + fn try_from(enr: &Enr) -> Result { let Some(address) = enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from)) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + return Err(NodeRecordParseError::ConversionFromEnrFailed) }; let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + return Err(NodeRecordParseError::ConversionFromEnrFailed) }; let Some(udp_port) = enr.udp4().or_else(|| enr.udp6()) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed(enr)) + return Err(NodeRecordParseError::ConversionFromEnrFailed) }; let id = pk_to_id(&enr.public_key()); From c7828499204f3a45a9076248c709919f178f7fb5 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 16:33:18 +0100 Subject: [PATCH 08/17] Enable reuse of method to get fork id --- crates/net/dns/src/lib.rs | 25 +++++++++++++++++-------- crates/primitives/src/lib.rs | 4 ++-- crates/primitives/src/net.rs | 18 ++++++++++++++++++ crates/rpc/rpc-types/src/net.rs | 6 ++++++ 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index 6ad72369d3e0..cf174ef76f3d 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -10,7 +10,6 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; @@ -22,12 +21,11 @@ use crate::{ pub use config::DnsDiscoveryConfig; use enr::Enr; use error::ParseDnsEntryError; -use reth_primitives::{ForkId, NodeRecord, PeerId}; +use reth_primitives::{get_fork_id, ForkId, NodeRecord, NodeRecordParseError}; use schnellru::{ByLength, LruMap}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - net::IpAddr, pin::Pin, sync::Arc, task::{ready, Context, Poll}, @@ -392,22 +390,33 @@ pub enum DnsDiscoveryEvent { /// Converts an [Enr] into a [NodeRecord] fn convert_enr_node_record(enr: Enr) -> Option { - use alloy_rlp::Decodable; - let node_record = match NodeRecord::try_from(&enr) { Ok(node_record) => node_record, Err(err) => { trace!(target: "disc::dns", %err, - "can't convert enr to node_record" + "can't convert enr to dns update" ); return None } }; - let mut maybe_fork_id = enr.get(b"eth")?; - let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); + let fork_id = match get_fork_id(&enr) { + Ok(fork_id) => Some(fork_id), + Err(err) => { + if matches!(err, NodeRecordParseError::EthForkIdMissing) { + trace!(target: "disc::dns", + %err, + "can't convert enr to dns update" + ); + + return None + } + + None // enr fork could be badly configured in node record, peer could still be useful + } + }; Some(DnsNodeRecordUpdate { node_record, fork_id, enr }) } diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index f98513278b20..aee1a092efa8 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -69,8 +69,8 @@ pub use header::{Header, HeaderValidationError, HeadersDirection, SealedHeader}; pub use integer_list::IntegerList; pub use log::{logs_bloom, Log}; pub use net::{ - goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, pk_to_id, sepolia_nodes, NodeRecord, - NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, + get_fork_id, goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, pk_to_id, sepolia_nodes, + NodeRecord, NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, SEPOLIA_BOOTNODES, }; pub use peer::{AnyNode, PeerId, WithPeerId}; diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index db6560a297ae..634bf1d23f25 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -1,5 +1,10 @@ +use alloy_rlp::Decodable; pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; +use enr::Enr; + +use crate::ForkId; + // /// Ethereum Foundation Go Bootnodes @@ -66,6 +71,19 @@ pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec) -> Result { + let Some(mut maybe_fork_id) = enr.get(b"eth") else { + return Err(NodeRecordParseError::EthForkIdMissing) + }; + + let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { + return Err(NodeRecordParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) + }; + + Ok(fork_id) +} + #[cfg(test)] mod tests { use std::{ diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index 1d61a7b524d5..e46a8ddf19f3 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -131,6 +131,12 @@ pub enum NodeRecordParseError { /// Conversion from type [`Enr`] failed. #[error("failed to convert enr into node record")] ConversionFromEnrFailed, + /// Missing key used to identify an execution layer enr on Ethereum network. + #[error("fork id missing on enr, 'eth' key missing")] + EthForkIdMissing, + /// Failed to decode fork ID rlp value. + #[error("failed to decode fork id, 'eth': {0:?}")] + ForkIdDecodeError(Vec), } impl FromStr for NodeRecord { From 918b2af6dc90b53694db0e89bae908ebfc12c657 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 16:57:42 +0100 Subject: [PATCH 09/17] Use invalid url error for missing ip and ports --- crates/rpc/rpc-types/src/net.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index e46a8ddf19f3..341561b0f8b9 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -128,9 +128,6 @@ pub enum NodeRecordParseError { /// Invalid discport #[error("Failed to discport query: {0}")] Discport(ParseIntError), - /// Conversion from type [`Enr`] failed. - #[error("failed to convert enr into node record")] - ConversionFromEnrFailed, /// Missing key used to identify an execution layer enr on Ethereum network. #[error("fork id missing on enr, 'eth' key missing")] EthForkIdMissing, @@ -182,15 +179,15 @@ impl TryFrom<&Enr> for NodeRecord { fn try_from(enr: &Enr) -> Result { let Some(address) = enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from)) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed) + return Err(NodeRecordParseError::InvalidUrl("ip missing".to_string())) }; let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed) + return Err(NodeRecordParseError::InvalidUrl("tcp port missing".to_string())) }; let Some(udp_port) = enr.udp4().or_else(|| enr.udp6()) else { - return Err(NodeRecordParseError::ConversionFromEnrFailed) + return Err(NodeRecordParseError::InvalidUrl("udp port missing".to_string())) }; let id = pk_to_id(&enr.public_key()); From 6c89757b3e8d00f35b91607b87c16958170e7ffe Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 16:58:21 +0100 Subject: [PATCH 10/17] Make udp port missing error have precedence over tcp port missing error --- crates/rpc/rpc-types/src/net.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index 341561b0f8b9..de40e66c902d 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -182,14 +182,14 @@ impl TryFrom<&Enr> for NodeRecord { return Err(NodeRecordParseError::InvalidUrl("ip missing".to_string())) }; - let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { - return Err(NodeRecordParseError::InvalidUrl("tcp port missing".to_string())) - }; - let Some(udp_port) = enr.udp4().or_else(|| enr.udp6()) else { return Err(NodeRecordParseError::InvalidUrl("udp port missing".to_string())) }; + let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { + return Err(NodeRecordParseError::InvalidUrl("tcp port missing".to_string())) + }; + let id = pk_to_id(&enr.public_key()); Ok(NodeRecord { address, tcp_port, udp_port, id }.into_ipv4_mapped()) From 84542fd5efbae0cf9b8babbcac1f555506909eee Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 17:02:02 +0100 Subject: [PATCH 11/17] Remove use of TryFrom impl for node record and of func to get fork id --- crates/net/dns/src/lib.rs | 47 ++++++++++++--------------------- crates/primitives/src/net.rs | 13 --------- crates/rpc/rpc-types/src/net.rs | 6 ----- 3 files changed, 17 insertions(+), 49 deletions(-) diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index cf174ef76f3d..f1056d841d59 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -10,6 +10,7 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; @@ -21,11 +22,12 @@ use crate::{ pub use config::DnsDiscoveryConfig; use enr::Enr; use error::ParseDnsEntryError; -use reth_primitives::{get_fork_id, ForkId, NodeRecord, NodeRecordParseError}; +use reth_primitives::{ForkId, NodeRecord, PeerId}; use schnellru::{ByLength, LruMap}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + net::IpAddr, pin::Pin, sync::Arc, task::{ready, Context, Poll}, @@ -239,7 +241,7 @@ impl DnsDiscoveryService { } fn on_resolved_enr(&mut self, enr: Enr) { - if let Some(record) = convert_enr_node_record(enr.clone()) { + if let Some(record) = convert_enr_node_record(&enr) { self.notify(record); } self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) @@ -389,36 +391,21 @@ pub enum DnsDiscoveryEvent { } /// Converts an [Enr] into a [NodeRecord] -fn convert_enr_node_record(enr: Enr) -> Option { - let node_record = match NodeRecord::try_from(&enr) { - Ok(node_record) => node_record, - Err(err) => { - trace!(target: "disc::dns", - %err, - "can't convert enr to dns update" - ); - - return None - } - }; - - let fork_id = match get_fork_id(&enr) { - Ok(fork_id) => Some(fork_id), - Err(err) => { - if matches!(err, NodeRecordParseError::EthForkIdMissing) { - trace!(target: "disc::dns", - %err, - "can't convert enr to dns update" - ); - - return None - } +fn convert_enr_node_record(enr: &Enr) -> Option { + use alloy_rlp::Decodable; + + let node_record = NodeRecord { + address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, + tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, + udp_port: enr.udp4().or_else(|| enr.udp6())?, + id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), + } + .into_ipv4_mapped(); - None // enr fork could be badly configured in node record, peer could still be useful - } - }; + let mut maybe_fork_id = enr.get(b"eth")?; + let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); - Some(DnsNodeRecordUpdate { node_record, fork_id, enr }) + Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() }) } #[cfg(test)] diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 634bf1d23f25..3b41379ca7a4 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -71,19 +71,6 @@ pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec) -> Result { - let Some(mut maybe_fork_id) = enr.get(b"eth") else { - return Err(NodeRecordParseError::EthForkIdMissing) - }; - - let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { - return Err(NodeRecordParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) - }; - - Ok(fork_id) -} - #[cfg(test)] mod tests { use std::{ diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index de40e66c902d..c76e60a86b0e 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -128,12 +128,6 @@ pub enum NodeRecordParseError { /// Invalid discport #[error("Failed to discport query: {0}")] Discport(ParseIntError), - /// Missing key used to identify an execution layer enr on Ethereum network. - #[error("fork id missing on enr, 'eth' key missing")] - EthForkIdMissing, - /// Failed to decode fork ID rlp value. - #[error("failed to decode fork id, 'eth': {0:?}")] - ForkIdDecodeError(Vec), } impl FromStr for NodeRecord { From cf3fc8792d89fb032def6060ede9acf8ecf56c8f Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 17:06:07 +0100 Subject: [PATCH 12/17] fixup! Remove use of TryFrom impl for node record and of func to get fork id --- crates/primitives/src/net.rs | 13 +++++++++++++ crates/rpc/rpc-types/src/net.rs | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 3b41379ca7a4..634bf1d23f25 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -71,6 +71,19 @@ pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec) -> Result { + let Some(mut maybe_fork_id) = enr.get(b"eth") else { + return Err(NodeRecordParseError::EthForkIdMissing) + }; + + let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { + return Err(NodeRecordParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) + }; + + Ok(fork_id) +} + #[cfg(test)] mod tests { use std::{ diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index c76e60a86b0e..de40e66c902d 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -128,6 +128,12 @@ pub enum NodeRecordParseError { /// Invalid discport #[error("Failed to discport query: {0}")] Discport(ParseIntError), + /// Missing key used to identify an execution layer enr on Ethereum network. + #[error("fork id missing on enr, 'eth' key missing")] + EthForkIdMissing, + /// Failed to decode fork ID rlp value. + #[error("failed to decode fork id, 'eth': {0:?}")] + ForkIdDecodeError(Vec), } impl FromStr for NodeRecord { From 420c8823148c9260380492ad572d2b60cc9f512f Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 17:15:20 +0100 Subject: [PATCH 13/17] Fix lint --- crates/primitives/src/net.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 634bf1d23f25..55ac239df559 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -94,8 +94,7 @@ mod tests { use crate::MAINNET; use super::*; - use alloy_rlp::{Decodable, Encodable}; - use enr::Enr; + use alloy_rlp::Encodable; use rand::{thread_rng, Rng, RngCore}; use reth_ethereum_forks::Hardfork; use reth_rpc_types::PeerId; From dfcf0db7621739adf631549d5345796973dce3a4 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 17:16:28 +0100 Subject: [PATCH 14/17] Remove whitespace Co-authored-by: Matthias Seitz --- crates/primitives/src/net.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 55ac239df559..018bf29253a6 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -2,7 +2,6 @@ use alloy_rlp::Decodable; pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; use enr::Enr; - use crate::ForkId; // From 7569d36c1cd1ce3f811fe48cefbef1adc269eca4 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 17:20:43 +0100 Subject: [PATCH 15/17] Move fork id errors to own error enum --- crates/primitives/src/lib.rs | 4 ++-- crates/primitives/src/net.rs | 22 +++++++++++++++++----- crates/rpc/rpc-types/src/net.rs | 6 ------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index aee1a092efa8..8284aaea1f17 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -70,8 +70,8 @@ pub use integer_list::IntegerList; pub use log::{logs_bloom, Log}; pub use net::{ get_fork_id, goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, pk_to_id, sepolia_nodes, - NodeRecord, NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, - SEPOLIA_BOOTNODES, + NetworkIdError, NodeRecord, NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, + MAINNET_BOOTNODES, SEPOLIA_BOOTNODES, }; pub use peer::{AnyNode, PeerId, WithPeerId}; pub use prune::{ diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 55ac239df559..29caa0c550c8 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -1,7 +1,7 @@ use alloy_rlp::Decodable; -pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; - use enr::Enr; +pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; +use thiserror::Error; use crate::ForkId; @@ -71,14 +71,26 @@ pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec), +} + /// Tries to read the [`ForkId`] from given [`Enr`]. -pub fn get_fork_id(enr: &Enr) -> Result { +pub fn get_fork_id(enr: &Enr) -> Result { let Some(mut maybe_fork_id) = enr.get(b"eth") else { - return Err(NodeRecordParseError::EthForkIdMissing) + return Err(NetworkIdError::EthForkIdMissing) }; let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { - return Err(NodeRecordParseError::ForkIdDecodeError(maybe_fork_id.to_vec())) + return Err(NetworkIdError::ForkIdDecodeError(maybe_fork_id.to_vec())) }; Ok(fork_id) diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index de40e66c902d..c76e60a86b0e 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -128,12 +128,6 @@ pub enum NodeRecordParseError { /// Invalid discport #[error("Failed to discport query: {0}")] Discport(ParseIntError), - /// Missing key used to identify an execution layer enr on Ethereum network. - #[error("fork id missing on enr, 'eth' key missing")] - EthForkIdMissing, - /// Failed to decode fork ID rlp value. - #[error("failed to decode fork id, 'eth': {0:?}")] - ForkIdDecodeError(Vec), } impl FromStr for NodeRecord { From 49d8b68b08028e459bffd1be71c68dc6f1c1972c Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 20:37:34 +0100 Subject: [PATCH 16/17] Remove generalisation of conversion from enr to node record --- crates/primitives/src/lib.rs | 5 +-- crates/primitives/src/net.rs | 71 +-------------------------------- crates/rpc/rpc-types/src/net.rs | 31 ++------------ 3 files changed, 7 insertions(+), 100 deletions(-) diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 8284aaea1f17..8d9793a7896d 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -69,9 +69,8 @@ pub use header::{Header, HeaderValidationError, HeadersDirection, SealedHeader}; pub use integer_list::IntegerList; pub use log::{logs_bloom, Log}; pub use net::{ - get_fork_id, goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, pk_to_id, sepolia_nodes, - NetworkIdError, NodeRecord, NodeRecordParseError, GOERLI_BOOTNODES, HOLESKY_BOOTNODES, - MAINNET_BOOTNODES, SEPOLIA_BOOTNODES, + goerli_nodes, holesky_nodes, mainnet_nodes, parse_nodes, sepolia_nodes, NodeRecord, + GOERLI_BOOTNODES, HOLESKY_BOOTNODES, MAINNET_BOOTNODES, SEPOLIA_BOOTNODES, }; pub use peer::{AnyNode, PeerId, WithPeerId}; pub use prune::{ diff --git a/crates/primitives/src/net.rs b/crates/primitives/src/net.rs index 29caa0c550c8..7d122f71a18f 100644 --- a/crates/primitives/src/net.rs +++ b/crates/primitives/src/net.rs @@ -1,9 +1,4 @@ -use alloy_rlp::Decodable; -use enr::Enr; -pub use reth_rpc_types::{pk_to_id, NodeRecord, NodeRecordParseError}; -use thiserror::Error; - -use crate::ForkId; +pub use reth_rpc_types::NodeRecord; // @@ -71,31 +66,6 @@ pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec), -} - -/// Tries to read the [`ForkId`] from given [`Enr`]. -pub fn get_fork_id(enr: &Enr) -> Result { - let Some(mut maybe_fork_id) = enr.get(b"eth") else { - return Err(NetworkIdError::EthForkIdMissing) - }; - - let Ok(fork_id) = ForkId::decode(&mut maybe_fork_id) else { - return Err(NetworkIdError::ForkIdDecodeError(maybe_fork_id.to_vec())) - }; - - Ok(fork_id) -} - #[cfg(test)] mod tests { use std::{ @@ -103,14 +73,10 @@ mod tests { str::FromStr, }; - use crate::MAINNET; - use super::*; - use alloy_rlp::Encodable; + use alloy_rlp::Decodable; use rand::{thread_rng, Rng, RngCore}; - use reth_ethereum_forks::Hardfork; use reth_rpc_types::PeerId; - use secp256k1::SecretKey; #[test] fn test_mapped_ipv6() { @@ -231,37 +197,4 @@ mod tests { id: PeerId::from_str("6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0").unwrap(), }) } - - #[test] - fn conversion_to_node_record_from_enr() { - const IP: &str = "::"; - const TCP_PORT: u16 = 30303; - const UDP_PORT: u16 = 9000; - - let mut rng = thread_rng(); - let key = SecretKey::new(&mut rng); - - let mut buf = Vec::new(); - let fork_id = MAINNET.hardfork_fork_id(Hardfork::Frontier); - fork_id.unwrap().encode(&mut buf); - - let enr = Enr::builder() - .ip6(IP.parse().unwrap()) - .udp6(UDP_PORT) - .tcp6(TCP_PORT) - .build(&key) - .unwrap(); - - let node_record = NodeRecord::try_from(&enr).unwrap(); - - assert_eq!( - NodeRecord { - address: IP.parse().unwrap(), - tcp_port: TCP_PORT, - udp_port: UDP_PORT, - id: pk_to_id(&enr.public_key()) - }, - node_record - ) - } } diff --git a/crates/rpc/rpc-types/src/net.rs b/crates/rpc/rpc-types/src/net.rs index c76e60a86b0e..885920162075 100644 --- a/crates/rpc/rpc-types/src/net.rs +++ b/crates/rpc/rpc-types/src/net.rs @@ -1,6 +1,5 @@ -use crate::{pk_to_id, PeerId}; +use crate::PeerId; use alloy_rlp::{RlpDecodable, RlpEncodable}; -use enr::Enr; use secp256k1::{SecretKey, SECP256K1}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{ @@ -10,7 +9,6 @@ use std::{ num::ParseIntError, str::FromStr, }; -use thiserror::Error; use url::{Host, Url}; /// Represents a ENR in discovery. @@ -116,8 +114,8 @@ impl fmt::Display for NodeRecord { } } -/// Possible error types when parsing a [`NodeRecord`] -#[derive(Debug, Error)] +/// Possible error types when parsing a `NodeRecord` +#[derive(Debug, thiserror::Error)] pub enum NodeRecordParseError { /// Invalid url #[error("Failed to parse url: {0}")] @@ -167,29 +165,6 @@ impl FromStr for NodeRecord { } } -impl TryFrom<&Enr> for NodeRecord { - type Error = NodeRecordParseError; - - fn try_from(enr: &Enr) -> Result { - let Some(address) = enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from)) - else { - return Err(NodeRecordParseError::InvalidUrl("ip missing".to_string())) - }; - - let Some(udp_port) = enr.udp4().or_else(|| enr.udp6()) else { - return Err(NodeRecordParseError::InvalidUrl("udp port missing".to_string())) - }; - - let Some(tcp_port) = enr.tcp4().or_else(|| enr.tcp6()) else { - return Err(NodeRecordParseError::InvalidUrl("tcp port missing".to_string())) - }; - - let id = pk_to_id(&enr.public_key()); - - Ok(NodeRecord { address, tcp_port, udp_port, id }.into_ipv4_mapped()) - } -} - #[cfg(test)] mod tests { use super::*; From 0e4693c8deffc3716d88eb06f77c031acabe5069 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 27 Mar 2024 20:42:28 +0100 Subject: [PATCH 17/17] fixup! Remove generalisation of conversion from enr to node record --- crates/rpc/rpc-types/src/peer.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/rpc/rpc-types/src/peer.rs b/crates/rpc/rpc-types/src/peer.rs index 44dbe5d71f24..a07e61d00285 100644 --- a/crates/rpc/rpc-types/src/peer.rs +++ b/crates/rpc/rpc-types/src/peer.rs @@ -2,8 +2,3 @@ use alloy_primitives::B512; /// Alias for a peer identifier pub type PeerId = B512; - -/// Converts a [`secp256k1::PublicKey`] to a [`PeerId`]. -pub fn pk_to_id(pk: &secp256k1::PublicKey) -> PeerId { - PeerId::from_slice(&pk.serialize_uncompressed()[1..]) -}