Skip to content

Commit

Permalink
Metrics for prometheus (#105)
Browse files Browse the repository at this point in the history
* add metrics for prometheus

* metrics: rename metrics

* metrics: add lumen_queried_total
  • Loading branch information
naim94a authored Aug 19, 2023
1 parent 2d41a92 commit 8f6fd97
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 12 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio-postgres = {version = "0.7", default-features = false, optional = true}
diesel = {version = "2.1", optional = true, default-features = false, features = ["postgres_backend", "time"]}
diesel-async = {version = "0.3", optional = true, features = ["postgres", "bb8"]}
anyhow = "1.0"
prometheus-client = "0.21.2"

[features]
default = ["web", "db"]
Expand Down
2 changes: 2 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ pub mod md;
pub mod rpc;
pub mod web;
pub mod async_drop;
pub mod metrics;

pub struct SharedState_ {
pub db: db::Database,
pub config: std::sync::Arc<config::Config>,
pub server_name: String,
pub metrics: metrics::Metrics,
}

pub type SharedState = std::sync::Arc<SharedState_>;
Expand Down
64 changes: 64 additions & 0 deletions common/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::sync::atomic::AtomicI64;

use prometheus_client::{registry::Registry, metrics::{gauge::Gauge, family::Family, counter::Counter}, encoding::EncodeLabelSet};

pub struct Metrics {
pub registry: Registry,

/// Count active lumina connections
pub active_connections: Gauge<i64, AtomicI64>,

/// Record connected client versions
pub lumina_version: Family<LuminaVersion, Gauge>,

/// Count new functions pushes
pub new_funcs: Counter<u64>,

/// Count pushed functions
pub pushes: Counter<u64>,

/// Count pulled functions (only found)
pub pulls: Counter<u64>,

/// Queried functions
pub queried_funcs: Counter<u64>,
}

#[derive(EncodeLabelSet, Debug, Hash, Eq, PartialEq, Clone)]
pub struct LuminaVersion {
pub protocol_version: u32,
}

impl Default for Metrics {
fn default() -> Self {
let mut registry = Registry::default();

let active_connections = Gauge::default();
registry.register("lumen_active_connections", "Active Lumina connections", active_connections.clone());

let lumina_version = Family::<LuminaVersion, Gauge>::default();
registry.register("lumen_protocol_version", "Version of Lumina protocol being used", lumina_version.clone());

let new_funcs = Counter::default();
registry.register("lumen_new_funcs", "Pushes previously unknown functions", new_funcs.clone());

let pushes = Counter::default();
registry.register("lumen_pushes_total", "Total pushes functions", pushes.clone());

let pulls = Counter::default();
registry.register("lumen_pulls_total", "Total pulled functions", pulls.clone());

let queried_funcs = Counter::default();
registry.register("lumen_queried_total", "Total Queried functions", queried_funcs.clone());

Metrics {
registry,
active_connections,
lumina_version,
new_funcs,
pushes,
pulls,
queried_funcs,
}
}
}
1 change: 1 addition & 0 deletions lumen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ clap = "4.3"
tokio-native-tls = "0.3"
native-tls = {version = "0.2"}
warp = "0.3"
prometheus-client = "0.21.2"
28 changes: 20 additions & 8 deletions lumen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
#![deny(clippy::all)]

use common::async_drop::AsyncDropper;
use common::metrics::LuminaVersion;
use common::rpc::{RpcHello, RpcFail, HelloResult};
use native_tls::Identity;
use clap::Arg;
use log::*;
use tokio::time::timeout;
use std::collections::HashMap;
use std::mem::discriminant;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
use std::{borrow::Cow, sync::Arc};
use tokio::{net::TcpListener, io::AsyncWrite, io::AsyncRead};
Expand Down Expand Up @@ -87,7 +87,10 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(state: &Share
return Err(Error::Timeout);
}
};
debug!("pull {}/{} funcs ended after {:?}", funcs.iter().filter(|v| v.is_some()).count(), md.funcs.len(), start.elapsed());
let pulled_funcs = funcs.iter().filter(|v| v.is_some()).count();
state.metrics.pulls.inc_by(pulled_funcs as _);
state.metrics.queried_funcs.inc_by(md.funcs.len() as _);
debug!("pull {pulled_funcs}/{} funcs ended after {:?}", md.funcs.len(), start.elapsed());

let statuses: Vec<u32> = funcs.iter().map(|v| u32::from(v.is_none())).collect();
let found = funcs
Expand Down Expand Up @@ -127,7 +130,12 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(state: &Share
return Ok(());
}
};
debug!("push {} funcs ended after {:?}", status.len(), start.elapsed());
state.metrics.pushes.inc_by(status.len() as _);
let new_funcs = status
.iter()
.fold(0u64, |counter, &v| if v > 0 { counter + 1 } else {counter});
state.metrics.new_funcs.inc_by(new_funcs);
debug!("push {} funcs ended after {:?} ({new_funcs} new)", status.len(), start.elapsed());

RpcMessage::PushMetadataResult(rpc::PushMetadataResult {
status: Cow::Owned(status),
Expand Down Expand Up @@ -187,6 +195,9 @@ async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(state: &SharedState, m
return Ok(());
}
};
state.metrics.lumina_version.get_or_create(&LuminaVersion {
protocol_version: hello.protocol_version,
}).inc();

if let Some(ref creds) = creds {
if creds.username != "guest" {
Expand Down Expand Up @@ -229,7 +240,6 @@ async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin>(state: &SharedStat
}

async fn serve(listener: TcpListener, accpt: Option<tokio_native_tls::TlsAcceptor>, state: SharedState, mut shutdown_signal: tokio::sync::oneshot::Receiver<()>) {
static COUNTER: AtomicU32 = AtomicU32::new(0);
let accpt = accpt.map(Arc::new);

let (async_drop, worker) = AsyncDropper::new();
Expand Down Expand Up @@ -264,21 +274,22 @@ async fn serve(listener: TcpListener, accpt: Option<tokio_native_tls::TlsAccepto
let accpt = accpt.clone();

let conns2 = connections.clone();
let counter = state.metrics.active_connections.clone();
let guard = async_drop.defer(async move {
let count = {
COUNTER.fetch_sub(1, Ordering::Relaxed) - 1
};
let count = counter.dec() - 1;
debug!("connection with {:?} ended after {:?}; {} active connections", addr, start.elapsed(), count);

let mut guard = conns2.lock().await;
if guard.remove(&addr).is_none() {
error!("Couldn't remove connection from set {addr}");
}
});

let counter = state.metrics.active_connections.clone();
let handle = tokio::spawn(async move {
let _guard = guard;
let count = {
COUNTER.fetch_add(1, Ordering::Relaxed) + 1
counter.inc() + 1
};
let protocol = if accpt.is_some() {" [TLS]"} else {""};
debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count);
Expand Down Expand Up @@ -353,6 +364,7 @@ fn main() {
db,
config,
server_name,
metrics: common::metrics::Metrics::default(),
});

let tls_acceptor;
Expand Down
24 changes: 20 additions & 4 deletions lumen/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
use std::net::SocketAddr;

use warp::{self, Filter};
use log::error;
use warp::{Filter, hyper::StatusCode, reply::Response};
use common::{SharedState, web::api::api_root};

pub async fn start_webserver<A: Into<SocketAddr> + 'static>(bind_addr: A, shared_state: SharedState) {
let root = warp::get()
.and(warp::path::end())
.map(|| warp::reply::html(include_str!("home.html")));


let shared_state1 = shared_state.clone();
let api = warp::path("api")
.and(api_root(shared_state));
.and(api_root(shared_state1));

let metrics = warp::get().and(warp::path("metrics")).and(warp::path::end())
.map(move || {
let mut res = String::new();
if let Err(err) = prometheus_client::encoding::text::encode(&mut res, &shared_state.metrics.registry) {
error!("failed to encode metrics: {err}");
let mut r = Response::default();
*r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
r
} else {
warp::reply::Response::new(res.into())
}
});

let routes = root
.or(api);
.or(api)
.or(metrics);

warp::serve(routes)
.run(bind_addr).await;
Expand Down

0 comments on commit 8f6fd97

Please sign in to comment.