Skip to content

Commit

Permalink
feat: add ipfs metrics
Browse files Browse the repository at this point in the history
Now we can separately see calls counts and durations to Ipfs operations
directly. Instead of only the toplevel Kubo RPC API requests.
  • Loading branch information
nathanielc committed Nov 2, 2023
1 parent 8fa6838 commit 8e96770
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 10 deletions.
8 changes: 7 additions & 1 deletion kubo-rpc/src/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ struct RequestLabels {
path: &'static str,
}

impl From<&Event> for RequestLabels {
fn from(value: &Event) -> Self {
Self { path: value.path }
}
}

/// Metrics for Kubo RPC API
#[derive(Clone)]
pub struct Metrics {
Expand Down Expand Up @@ -70,7 +76,7 @@ pub struct Event {

impl Recorder<Event> for Metrics {
fn record(&self, event: &Event) {
let labels = RequestLabels { path: event.path };
let labels: RequestLabels = event.into();
self.requests.get_or_create(&labels).inc();
self.request_durations
.get_or_create(&labels)
Expand Down
161 changes: 161 additions & 0 deletions kubo-rpc/src/ipfs_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::{collections::HashMap, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use ceramic_metadata::Version;
use ceramic_metrics::Recorder;
use cid::Cid;
use futures_util::{stream::BoxStream, Future};
use iroh_rpc_types::GossipsubEvent;
use libipld::Ipld;
use libp2p_identity::PeerId;
use multiaddr::Multiaddr;
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::{exponential_buckets, Histogram},
},
registry::Registry,
};
use tokio::time::Instant;

use crate::{error::Error, IpfsDep, IpfsPath, PeerInfo};

#[derive(Clone)]
pub struct IpfsMetrics {
calls: Family<IpfsCallLabels, Counter>,
call_durations: Family<IpfsCallLabels, Histogram>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct IpfsCallLabels {
method: &'static str,
}
impl From<&IpfsCallEvent> for IpfsCallLabels {
fn from(value: &IpfsCallEvent) -> Self {
Self {
method: value.method,
}
}
}

impl IpfsMetrics {
/// Register and construct Metrics
pub fn register(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("ipfs");

let calls = Family::<IpfsCallLabels, Counter>::default();
sub_registry.register("calls", "Number of API calls", calls.clone());

let call_durations = Family::<IpfsCallLabels, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.005, 2.0, 20))
});
sub_registry.register(
"call_durations",
"Duration of API calls",
call_durations.clone(),
);

Self {
calls,
call_durations,
}
}
}

struct IpfsCallEvent {
method: &'static str,
duration: Duration,
}

impl Recorder<IpfsCallEvent> for IpfsMetrics {
fn record(&self, event: &IpfsCallEvent) {
let labels: IpfsCallLabels = event.into();
self.calls.get_or_create(&labels).inc();
self.call_durations
.get_or_create(&labels)
.observe(event.duration.as_secs_f64());
}
}

/// Implements IpfsDep and records metrics about each call.
#[derive(Clone)]
pub struct IpfsMetricsMiddleware<I: Clone> {
ipfs: I,
metrics: IpfsMetrics,
}

impl<I: Clone> IpfsMetricsMiddleware<I> {
/// Construct a new IpfsMetricsMiddleware.
/// The metrics should have already be registered.
pub fn new(ipfs: I, metrics: IpfsMetrics) -> Self {
Self { ipfs, metrics }
}
// Record metrics for a given API endpoint
async fn record<T>(&self, method: &'static str, fut: impl Future<Output = T>) -> T {
let start = Instant::now();
let ret = fut.await;
let duration = start.elapsed();
let event = IpfsCallEvent { method, duration };
self.metrics.record(&event);
ret
}
}

#[async_trait]
impl<I> IpfsDep for IpfsMetricsMiddleware<I>
where
I: IpfsDep,
I: Clone + Send + Sync,
{
async fn lookup_local(&self) -> Result<PeerInfo, Error> {
self.record("lookup_local", self.ipfs.lookup_local()).await
}
async fn lookup(&self, peer_id: PeerId) -> Result<PeerInfo, Error> {
self.record("lookup", self.ipfs.lookup(peer_id)).await
}
async fn block_size(&self, cid: Cid) -> Result<u64, Error> {
self.record("block_size", self.ipfs.block_size(cid)).await
}
async fn block_get(&self, cid: Cid, offline: bool) -> Result<Bytes, Error> {
self.record("block_get", self.ipfs.block_get(cid, offline))
.await
}
async fn get(&self, ipfs_path: &IpfsPath) -> Result<(Cid, Ipld), Error> {
self.record("get", self.ipfs.get(ipfs_path)).await
}
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<(), Error> {
self.record("put", self.ipfs.put(cid, blob, links)).await
}
async fn resolve(&self, ipfs_path: &IpfsPath) -> Result<(Cid, String), Error> {
self.record("resolve", self.ipfs.resolve(ipfs_path)).await
}
async fn peers(&self) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
self.record("peers", self.ipfs.peers()).await
}
async fn connect(&self, peer_id: PeerId, addrs: Vec<Multiaddr>) -> Result<(), Error> {
self.record("connect", self.ipfs.connect(peer_id, addrs))
.await
}
async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error> {
self.record("publish", self.ipfs.publish(topic, data)).await
}
async fn subscribe(
&self,
topic: String,
) -> Result<BoxStream<'static, anyhow::Result<GossipsubEvent>>, Error> {
self.record("subscribe", self.ipfs.subscribe(topic)).await
}
async fn unsubscribe(&self, topic: String) -> Result<(), Error> {
self.record("unsubscribe", self.ipfs.unsubscribe(topic))
.await
}
async fn topics(&self) -> Result<Vec<String>, Error> {
self.record("topics", self.ipfs.topics()).await
}
async fn version(&self) -> Result<Version, Error> {
self.record("version", self.ipfs.version()).await
}
}
4 changes: 4 additions & 0 deletions kubo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ pub use libipld::Ipld;
pub use libp2p::Multiaddr;
pub use libp2p_identity::PeerId;

// TODO(WS1-1310): Refactor Ipfs out of KuboRpc so we do not have these prefixed types.
pub use ipfs_metrics::{IpfsMetrics, IpfsMetricsMiddleware};

pub mod block;
pub mod dag;
pub mod error;
#[cfg(feature = "http")]
pub mod http;
pub mod id;
mod ipfs_metrics;
pub mod pin;
pub mod pubsub;
pub mod swarm;
Expand Down
4 changes: 3 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,12 @@ impl Daemon {
} else {
None
};
let ipfs_metrics =
ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register);
let ipfs = Ipfs::builder()
.with_p2p(p2p_config, keypair, recons, sql_pool.clone())
.await?
.build(sql_pool.clone())
.build(sql_pool.clone(), ipfs_metrics)
.await?;

Ok(Daemon {
Expand Down
18 changes: 10 additions & 8 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use anyhow::Result;
use ceramic_core::{EventId, Interest};
use ceramic_kubo_rpc::IpfsService;
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, SQLiteBlockStore};
use iroh_rpc_client::P2pClient;
use iroh_rpc_types::{p2p::P2pAddr, Addr};
Expand Down Expand Up @@ -69,28 +69,30 @@ impl Builder<Init> {

/// Finish the build
impl Builder<WithP2p> {
pub async fn build(self, sql_pool: SqlitePool) -> Result<Ipfs> {
pub async fn build(self, sql_pool: SqlitePool, ipfs_metrics: IpfsMetrics) -> Result<Ipfs> {
let ipfs_service = Arc::new(IpfsService::new(
P2pClient::new(self.state.p2p.addr.clone()).await?,
SQLiteBlockStore::new(sql_pool).await?,
));
let ipfs_service = IpfsMetricsMiddleware::new(ipfs_service, ipfs_metrics);
Ok(Ipfs {
api: Arc::new(IpfsService::new(
P2pClient::new(self.state.p2p.addr.clone()).await?,
SQLiteBlockStore::new(sql_pool).await?,
)),
api: ipfs_service,
p2p: self.state.p2p,
})
}
}

// Provides Ipfs node implementation
pub struct Ipfs {
api: Arc<IpfsService>,
api: IpfsMetricsMiddleware<Arc<IpfsService>>,
p2p: Service<P2pAddr>,
}

impl Ipfs {
pub fn builder() -> Builder<Init> {
Builder { state: Init {} }
}
pub fn api(&self) -> Arc<IpfsService> {
pub fn api(&self) -> IpfsMetricsMiddleware<Arc<IpfsService>> {
self.api.clone()
}
pub async fn stop(self) -> Result<()> {
Expand Down

0 comments on commit 8e96770

Please sign in to comment.