Skip to content

Commit

Permalink
api: add metrics of used resources (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Nov 17, 2024
1 parent f0ef8df commit f5bc1d3
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ The minor version will be incremented upon a breaking change and the patch versi
- geyser: use process_compute_budget_instructions ([#15](https://github.com/solana-stream-solutions/solfees/pull/15))
- api: improve parallelism ([#16](https://github.com/solana-stream-solutions/solfees/pull/16))
- geyser: do not stream outdated data ([#17](https://github.com/solana-stream-solutions/solfees/pull/17))
- api: add metrics of used resources ([#18](https://github.com/solana-stream-solutions/solfees/pull/18))

### Breaking
7 changes: 7 additions & 0 deletions solfees-be/config-be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ listen_rpc:
calls_queue_max: 16384 # Maximum number of requests in the queue (each request can contain max `request_calls_max` calls)
streams_channel_capacity: 512 # Maximum number of messages in WebSocket channel before disconnect
pool_size: 2 # Number of workers processing requests (WebSocket streams processed by separate task)

metrics:
# `null` means empty label will be used (default value)
usage_client_id: null
usage_subscription_id: null
# usage_client_id: "x-client-id"
# usage_subscription_id: "x-subscription-id"
3 changes: 1 addition & 2 deletions solfees-be/config-grpc2redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ redis:
endpoint: redis://127.0.0.1:6379/
slot_finalized: solfees:finalized
stream_key: solfees:events
# increate for production
# 6_000 / 2 / 4 / 2.5 / 60 = 5min (2 producers, 4 events per slot, 2.5 slots per sec)
# increate for production: 3_000 / 4 / 2.5 / 60 = 5min
stream_maxlen: 600
stream_field_key: message
epochs_key: solfees:epochs
Expand Down
3 changes: 2 additions & 1 deletion solfees-be/src/bin/solfees-be.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
futures::future::{try_join_all, FutureExt, TryFutureExt},
solfees_be::{
cli, config::ConfigSolfees as Config, metrics::solfees_be as metrics, redis, rpc_server,
cli, config::ConfigBe as Config, metrics::solfees_be as metrics, redis, rpc_server,
rpc_solana::SolanaRpc,
},
std::sync::Arc,
Expand Down Expand Up @@ -46,6 +46,7 @@ async fn main2(config: Config) -> anyhow::Result<()> {
config.listen_rpc.bind,
config.listen_rpc.body_limit,
solana_rpc.clone(),
Arc::new(config.metrics),
Arc::clone(&rpc_solfees_shutdown),
))
.map(|result| result?)
Expand Down
12 changes: 10 additions & 2 deletions solfees-be/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ impl Default for ConfigListenAdmin {

#[derive(Debug, Default, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct ConfigSolfees {
pub struct ConfigBe {
pub tracing: ConfigTracing,
pub redis: ConfigRedisConsumer,
pub listen_admin: ConfigListenAdmin,
pub listen_rpc: ConfigListenRpc,
pub metrics: ConfigMetrics,
}

impl WithConfigTracing for ConfigSolfees {
impl WithConfigTracing for ConfigBe {
fn get_tracing(&self) -> &ConfigTracing {
&self.tracing
}
Expand Down Expand Up @@ -180,6 +181,13 @@ impl Default for ConfigListenRpc {
}
}

#[derive(Debug, Default, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct ConfigMetrics {
pub usage_client_id: Option<String>,
pub usage_subscription_id: Option<String>,
}

fn deserialize_maybe_env<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: serde::de::DeserializeOwned + FromStr,
Expand Down
122 changes: 116 additions & 6 deletions solfees-be/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ pub mod solfees_be {
use {
super::{init2, REGISTRY},
crate::{
config::ConfigMetrics,
grpc_geyser::CommitmentLevel,
rpc_solana::{RpcRequestsStats, SolanaRpcMode},
},
http::StatusCode,
http::{HeaderMap, StatusCode},
prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts},
solana_sdk::clock::Slot,
std::{borrow::Cow, time::Duration},
std::{
borrow::Cow,
sync::Arc,
time::{Duration, Instant},
},
};

lazy_static::lazy_static! {
Expand Down Expand Up @@ -137,6 +142,16 @@ pub mod solfees_be {
Opts::new("websockets_alive_total", "Total number of alive WebSocket connections"),
&["api"]
).unwrap();

static ref CLIENT_USAGE_CPU_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("client_usage_cpu_total", "Total number of CPU usage in nanoseconds"),
&["client_id", "subscription_id"]
).unwrap();

static ref CLIENT_USAGE_EGRESS_WS_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("client_usage_egress_total", "Total number of bytes sent over WebSocket"),
&["client_id", "subscription_id"]
).unwrap();
}

pub fn init() {
Expand All @@ -147,6 +162,8 @@ pub mod solfees_be {
register!(REQUESTS_CALLS_TOTAL);
register!(REQUESTS_QUEUE_SIZE);
register!(WEBSOCKETS_ALIVE_TOTAL);
register!(CLIENT_USAGE_CPU_TOTAL);
register!(CLIENT_USAGE_EGRESS_WS_TOTAL);
}

pub fn set_slot(commitment: CommitmentLevel, slot: Slot) {
Expand Down Expand Up @@ -179,16 +196,16 @@ pub mod solfees_be {
.inc_by(stats.latest_blockhash);
REQUESTS_CALLS_TOTAL
.with_label_values(&[api.as_str(), "get_leader_schedule"])
.inc_by(stats.latest_blockhash);
.inc_by(stats.leader_schedule);
REQUESTS_CALLS_TOTAL
.with_label_values(&[api.as_str(), "get_recent_prioritization_fees"])
.inc_by(stats.latest_blockhash);
.inc_by(stats.recent_prioritization_fees);
REQUESTS_CALLS_TOTAL
.with_label_values(&[api.as_str(), "get_slot"])
.inc_by(stats.latest_blockhash);
.inc_by(stats.slot);
REQUESTS_CALLS_TOTAL
.with_label_values(&[api.as_str(), "get_version"])
.inc_by(stats.latest_blockhash);
.inc_by(stats.version);
}

pub fn requests_queue_size_inc() {
Expand Down Expand Up @@ -216,4 +233,97 @@ pub mod solfees_be {
};
WEBSOCKETS_ALIVE_TOTAL.with_label_values(&[api]).dec()
}

#[derive(Debug)]
struct ClientIdInner {
client_id: String,
subsription_id: String,
}

#[derive(Debug, Clone)]
pub struct ClientId {
inner: Arc<ClientIdInner>,
}

impl ClientId {
pub fn new(headers: &HeaderMap, config_metrics: &ConfigMetrics) -> Self {
Self {
inner: Arc::new(ClientIdInner {
client_id: Self::get(headers, config_metrics.usage_client_id.as_deref()),
subsription_id: Self::get(
headers,
config_metrics.usage_subscription_id.as_deref(),
),
}),
}
}

fn get(headers: &HeaderMap, key: Option<&str>) -> String {
key.map_or_else(String::new, |key| {
headers
.get(key)
.and_then(|id| id.to_str().ok())
.map(String::from)
.unwrap_or_default()
})
}

pub fn start_timer_cpu(&self) -> ClientIdTimerCpu {
ClientIdTimerCpu::new(self)
}

pub fn observe_cpu(&self, nanos: u64) {
CLIENT_USAGE_CPU_TOTAL
.with_label_values(&[&self.inner.client_id, &self.inner.subsription_id])
.inc_by(nanos);
}

pub fn observe_egress_ws(&self, bytes: u64) {
CLIENT_USAGE_EGRESS_WS_TOTAL
.with_label_values(&[&self.inner.client_id, &self.inner.subsription_id])
.inc_by(bytes);
}
}

#[derive(Debug)]
pub struct ClientIdTimerCpu<'a> {
client_id: &'a ClientId,
start: Instant,
observed: bool,
}

impl<'a> Drop for ClientIdTimerCpu<'a> {
fn drop(&mut self) {
if !self.observed {
self.observe(true);
}
}
}

impl<'a> ClientIdTimerCpu<'a> {
fn new(client_id: &'a ClientId) -> Self {
Self {
client_id,
start: Instant::now(),
observed: false,
}
}

fn observe(&mut self, record: bool) {
self.observed = true;
if record {
let elapsed = Instant::now().saturating_duration_since(self.start);
let nanos = elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64;
self.client_id.observe_cpu(nanos);
}
}

pub fn stop_and_record(mut self) {
self.observe(true);
}

pub fn stop_and_discard(mut self) {
self.observe(false);
}
}
}
14 changes: 11 additions & 3 deletions solfees-be/src/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
config::ConfigMetrics,
metrics::{self, solfees_be as metrics_be},
rpc_solana::{SolanaRpc, SolanaRpcMode},
},
Expand Down Expand Up @@ -66,6 +67,7 @@ pub async fn run_solfees(
addr: SocketAddr,
body_limit: usize,
solana_rpc: SolanaRpc,
config_metrics: Arc<ConfigMetrics>,
shutdown: Arc<Notify>,
) -> anyhow::Result<()> {
let listener = TcpListener::bind(addr).await?;
Expand All @@ -80,11 +82,13 @@ pub async fn run_solfees(
};

let solana_rpc = solana_rpc.clone();
let config_metrics = Arc::clone(&config_metrics);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let connection = http.serve_connection_with_upgrades(
TokioIo::new(Box::pin(stream)),
service_fn(move |mut req: Request<BodyIncoming>| {
let solana_rpc = solana_rpc.clone();
let config_metrics = Arc::clone(&config_metrics);
let shutdown_rx = shutdown_rx.resubscribe();
async move {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -113,6 +117,7 @@ pub async fn run_solfees(
}
};

let client_id = metrics_be::ClientId::new(req.headers(), &config_metrics);
match req_type {
ReqType::Rpc => {
let ts = Instant::now();
Expand All @@ -121,6 +126,7 @@ pub async fn run_solfees(
.map_err(|error| anyhow::anyhow!(error))
.and_then(|body| {
solana_rpc.on_request(
client_id,
solana_rpc_mode,
body.aggregate(),
shutdown_rx,
Expand Down Expand Up @@ -156,9 +162,11 @@ pub async fn run_solfees(
}),
) {
Ok((response, websocket)) => {
tokio::spawn(
solana_rpc.on_websocket(solana_rpc_mode, websocket),
);
tokio::spawn(solana_rpc.on_websocket(
client_id,
solana_rpc_mode,
websocket,
));
let (parts, body) = response.into_parts();
Ok(Response::from_parts(parts, body.boxed()))
}
Expand Down
Loading

0 comments on commit f5bc1d3

Please sign in to comment.