diff --git a/nomos-libp2p/Cargo.toml b/nomos-libp2p/Cargo.toml index 803b2b686..28fc66d5e 100644 --- a/nomos-libp2p/Cargo.toml +++ b/nomos-libp2p/Cargo.toml @@ -24,8 +24,12 @@ hex = "0.4.3" log = "0.4.19" thiserror = "1.0.40" tracing = "0.1" +prometheus-client = { version = "0.21.0", optional = true } [dev-dependencies] env_logger = "0.10.0" serde_json = "1.0.99" tokio = { version = "1", features = ["time"] } + +[features] +metrics = ["libp2p/metrics", "prometheus-client"] diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index fe0d04325..a4353337c 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -10,6 +10,8 @@ use blake2::digest::{consts::U32, Digest}; use blake2::Blake2b; use libp2p::gossipsub::{Message, MessageId, TopicHash}; +#[cfg(feature = "metrics")] +use libp2p::metrics::Metrics; pub use libp2p::{ core::upgrade, dns, @@ -23,12 +25,16 @@ pub use libp2p::{ }; use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp}; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; +#[cfg(feature = "metrics")] +use prometheus_client::registry::Registry; use serde::{Deserialize, Serialize}; /// Wraps [`libp2p::Swarm`], and config it for use within Nomos. pub struct Swarm { // A core libp2p swarm swarm: libp2p::Swarm, + #[cfg(feature = "metrics")] + metrics: Metrics, } #[derive(NetworkBehaviour)] @@ -108,7 +114,19 @@ impl Swarm { swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?; - Ok(Swarm { swarm }) + #[cfg(feature = "metrics")] + let metrics = { + let mut metric_registry = Registry::default(); + Metrics::new(&mut metric_registry) + //TODO: use `metric_registry` to expose metrics to external + // https://github.com/libp2p/rust-libp2p/blob/c1551d71b6a7ad673706d7106c797fe464cb2820/examples/metrics/src/main.rs#L67-L67 + }; + + Ok(Swarm { + swarm, + #[cfg(feature = "metrics")] + metrics, + }) } /// Initiates a connection attempt to a peer @@ -157,6 +175,11 @@ impl Swarm { &self.swarm } + #[cfg(feature = "metrics")] + pub fn metrics(&self) -> &Metrics { + &self.metrics + } + pub fn is_subscribed(&mut self, topic: &str) -> bool { let topic_hash = Self::topic_hash(topic); diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 441881568..6a872e090 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -37,4 +37,5 @@ tokio = { version = "1", features = ["full"] } default = [] waku = ["waku-bindings"] libp2p = ["nomos-libp2p", "rand", "humantime-serde"] +metrics = ["nomos-libp2p/metrics"] mock = ["rand", "chrono"] diff --git a/nomos-services/network/src/backends/libp2p/swarm.rs b/nomos-services/network/src/backends/libp2p/swarm.rs index f82ec4902..d77dd3d3a 100644 --- a/nomos-services/network/src/backends/libp2p/swarm.rs +++ b/nomos-services/network/src/backends/libp2p/swarm.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, ops::Range, time::Duration}; use mixnet_client::MixnetClient; +#[cfg(feature = "metrics")] +use nomos_libp2p::libp2p::metrics::Recorder; use nomos_libp2p::{ gossipsub::{self, Message}, libp2p::swarm::ConnectionId, @@ -90,14 +92,41 @@ impl SwarmHandler { fn handle_event(&mut self, event: SwarmEvent>) { match event { - SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message { - propagation_source: peer_id, - message_id: id, + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(event)) => { + self.handle_gossipsub_event(event); + } + swarm_event => { + self.handle_swarm_event(swarm_event); + } + } + } + + fn handle_gossipsub_event(&mut self, event: gossipsub::Event) { + #[cfg(feature = "metrics")] + self.swarm.metrics().record(&event); + + match event { + gossipsub::Event::Message { + propagation_source, + message_id, message, - })) => { - tracing::debug!("Got message with id: {id} from peer: {peer_id}"); + } => { + tracing::debug!( + "Got message with id: {message_id} propagated from {propagation_source}" + ); log_error!(self.events_tx.send(Event::Message(message))); } + event => { + tracing::debug!("gossipsub event: {event:?}"); + } + } + } + + fn handle_swarm_event(&mut self, event: SwarmEvent>) { + #[cfg(feature = "metrics")] + self.swarm.metrics().record(&event); + + match event { SwarmEvent::ConnectionEstablished { peer_id, connection_id, @@ -130,7 +159,9 @@ impl SwarmHandler { ); self.retry_connect(connection_id); } - _ => {} + event => { + tracing::debug!("swarm event: {event:?}"); + } } }