Skip to content

Commit

Permalink
chore: Unit tests for metrics.rs (#2122)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing authored Oct 4, 2024
1 parent 6b46875 commit 393ea54
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 48 deletions.
8 changes: 7 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
workspace = { members = ["backoff", "numaflow-models", "servesink", "serving", "numaflow-core"] }
workspace = { members = [
"backoff",
"numaflow-models",
"servesink",
"serving",
"numaflow-core",
] }

[[bin]]
name = "numaflow"
Expand Down
115 changes: 74 additions & 41 deletions rust/numaflow-core/src/monovertex/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ use axum::http::{Response, StatusCode};
use axum::response::IntoResponse;
use axum::{routing::get, Router};
use axum_server::tls_rustls::RustlsConfig;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use rcgen::{generate_simple_self_signed, CertifiedKey};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time;
use tonic::transport::Channel;
use tonic::Request;
use tracing::{debug, error, info};

use crate::config::config;
Expand All @@ -22,14 +29,6 @@ use crate::monovertex::sink_pb::sink_client::SinkClient;
use crate::monovertex::source_pb::source_client::SourceClient;
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient;
use crate::reader;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use tonic::transport::Channel;
use tonic::Request;

// Define the labels for the metrics
// Note: Please keep consistent with the definitions in MonoVertex daemon
Expand Down Expand Up @@ -263,31 +262,6 @@ pub async fn metrics_handler() -> impl IntoResponse {
.unwrap()
}

/// Collect and emit prometheus metrics.
/// Metrics router and server over HTTP endpoint.
// This is not used currently
#[allow(dead_code)]
pub(crate) async fn start_metrics_http_server<A>(
addr: A,
metrics_state: MetricsState,
) -> crate::Result<()>
where
A: ToSocketAddrs + std::fmt::Debug,
{
let metrics_app = metrics_router(metrics_state);

let listener = TcpListener::bind(&addr)
.await
.map_err(|e| Error::MetricsError(format!("Creating listener on {:?}: {}", addr, e)))?;

debug!("metrics server started at addr: {:?}", addr);

axum::serve(listener, metrics_app)
.await
.map_err(|e| Error::MetricsError(format!("Starting web server for metrics: {}", e)))?;
Ok(())
}

pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: MetricsState,
Expand Down Expand Up @@ -490,22 +464,24 @@ async fn fetch_pending<T: reader::LagReader>(lag_reader: &mut T) -> crate::error
Ok(response)
}

const LOOKBACK_SECONDS_MAP: [(&str, i64); 4] =
[("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)];

// Periodically exposes the pending metrics by calculating the average pending messages over different intervals.
async fn expose_pending_metrics(
refresh_interval: Duration,
pending_stats: Arc<Mutex<Vec<TimestampedPending>>>,
) {
let mut ticker = time::interval(refresh_interval);
let lookback_seconds_map = vec![("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)];

// store the pending info in a sorted way for deterministic display
// string concat is more efficient?
let mut pending_info: BTreeMap<&str, i64> = BTreeMap::new();

loop {
ticker.tick().await;
for (label, seconds) in &lookback_seconds_map {
let pending = calculate_pending(*seconds, &pending_stats).await;
for (label, seconds) in LOOKBACK_SECONDS_MAP {
let pending = calculate_pending(seconds, &pending_stats).await;
if pending != -1 {
let mut metric_labels = forward_metrics_labels().clone();
metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string()));
Expand Down Expand Up @@ -553,14 +529,17 @@ async fn calculate_pending(

#[cfg(test)]
mod tests {
use super::*;
use crate::monovertex::metrics::MetricsState;
use crate::shared::utils::create_rpc_channel;
use std::net::SocketAddr;
use std::time::Instant;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source, sourcetransform};
use std::net::SocketAddr;
use tokio::sync::mpsc::Sender;

use super::*;
use crate::monovertex::metrics::MetricsState;
use crate::shared::utils::create_rpc_channel;

struct SimpleSource;
#[tonic::async_trait]
impl source::Sourcer for SimpleSource {
Expand Down Expand Up @@ -709,4 +688,58 @@ mod tests {
fb_sink_server_handle.await.unwrap();
transformer_handle.await.unwrap();
}

#[tokio::test]
async fn test_expose_pending_metrics() {
let pending_stats = Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS)));
let refresh_interval = Duration::from_secs(1);

// Populate pending_stats with some values.
// The array will be sorted by the timestamp with the most recent last.
{
let mut pending_stats = pending_stats.lock().await;
pending_stats.push(TimestampedPending {
pending: 15,
timestamp: Instant::now() - Duration::from_secs(150),
});
pending_stats.push(TimestampedPending {
pending: 30,
timestamp: Instant::now() - Duration::from_secs(70),
});
pending_stats.push(TimestampedPending {
pending: 20,
timestamp: Instant::now() - Duration::from_secs(30),
});
pending_stats.push(TimestampedPending {
pending: 10,
timestamp: Instant::now(),
});
}

tokio::spawn({
let pending_stats = pending_stats.clone();
async move {
expose_pending_metrics(refresh_interval, pending_stats).await;
}
});
// We use tokio::time::interval() as the ticker in the expose_pending_metrics() function.
// The first tick happens immediately, so we don't need to wait for the refresh_interval for the first iteration to complete.
tokio::time::sleep(Duration::from_millis(50)).await;

// Get the stored values for all time intevals
// We will store the values corresponding to the labels (from LOOKBACK_SECONDS_MAP) "1m", "default", "5m", "15" in the same order in this array
let mut stored_values: [i64; 4] = [0; 4];
{
for (i, (label, _)) in LOOKBACK_SECONDS_MAP.iter().enumerate() {
let mut metric_labels = forward_metrics_labels().clone();
metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string()));
let guage = forward_metrics()
.source_pending
.get_or_create(&metric_labels)
.get();
stored_values[i] = guage;
}
}
assert_eq!(stored_values, [15, 20, 18, 18]);
}
}
12 changes: 6 additions & 6 deletions rust/numaflow-core/src/shared/server_info.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -190,12 +190,12 @@ fn human_readable(ver: &str) -> String {
return String::new();
}
// semver
if ver.ends_with("-z") {
return ver[..ver.len() - 2].to_string();
if let Some(version) = ver.strip_suffix("-z") {
return version.to_string();
}
// PEP 440
if ver.ends_with("rc100") {
return ver[..ver.len() - 5].to_string();
if let Some(version) = ver.strip_suffix("rc100") {
return version.to_string();
}
ver.to_string()
}
Expand Down Expand Up @@ -265,7 +265,7 @@ fn trim_after_dash(input: &str) -> &str {

/// Extracts the container type from the server info file.
/// The file name is in the format of <container_type>-server-info.
fn get_container_type(server_info_file: &PathBuf) -> Option<&str> {
fn get_container_type(server_info_file: &Path) -> Option<&str> {
let file_name = server_info_file.file_name()?;
let container_type = file_name.to_str()?.trim_end_matches("-server-info");
if container_type.is_empty() {
Expand Down

0 comments on commit 393ea54

Please sign in to comment.