diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 518f90510..54f4a9117 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,4 +1,10 @@ -workspace = { members = ["backoff", "numaflow-models", "servesink", "serving", "numaflow-core"] } +workspace = { members = [ + "backoff", + "numaflow-models", + "servesink", + "serving", + "numaflow-core", +] } [[bin]] name = "numaflow" diff --git a/rust/numaflow-core/src/monovertex/metrics.rs b/rust/numaflow-core/src/monovertex/metrics.rs index adbbdde3b..bf6ee4d30 100644 --- a/rust/numaflow-core/src/monovertex/metrics.rs +++ b/rust/numaflow-core/src/monovertex/metrics.rs @@ -9,11 +9,18 @@ use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use axum::{routing::get, Router}; use axum_server::tls_rustls::RustlsConfig; +use prometheus_client::encoding::text::encode; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; +use prometheus_client::registry::Registry; use rcgen::{generate_simple_self_signed, CertifiedKey}; -use tokio::net::{TcpListener, ToSocketAddrs}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time; +use tonic::transport::Channel; +use tonic::Request; use tracing::{debug, error, info}; use crate::config::config; @@ -22,14 +29,6 @@ use crate::monovertex::sink_pb::sink_client::SinkClient; use crate::monovertex::source_pb::source_client::SourceClient; use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; use crate::reader; -use prometheus_client::encoding::text::encode; -use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::family::Family; -use prometheus_client::metrics::gauge::Gauge; -use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; -use prometheus_client::registry::Registry; -use tonic::transport::Channel; -use tonic::Request; // Define the labels for the metrics // Note: Please keep consistent with the definitions in MonoVertex daemon @@ -263,31 +262,6 @@ pub async fn metrics_handler() -> impl IntoResponse { .unwrap() } -/// Collect and emit prometheus metrics. -/// Metrics router and server over HTTP endpoint. -// This is not used currently -#[allow(dead_code)] -pub(crate) async fn start_metrics_http_server( - addr: A, - metrics_state: MetricsState, -) -> crate::Result<()> -where - A: ToSocketAddrs + std::fmt::Debug, -{ - let metrics_app = metrics_router(metrics_state); - - let listener = TcpListener::bind(&addr) - .await - .map_err(|e| Error::MetricsError(format!("Creating listener on {:?}: {}", addr, e)))?; - - debug!("metrics server started at addr: {:?}", addr); - - axum::serve(listener, metrics_app) - .await - .map_err(|e| Error::MetricsError(format!("Starting web server for metrics: {}", e)))?; - Ok(()) -} - pub(crate) async fn start_metrics_https_server( addr: SocketAddr, metrics_state: MetricsState, @@ -490,13 +464,15 @@ async fn fetch_pending(lag_reader: &mut T) -> crate::error Ok(response) } +const LOOKBACK_SECONDS_MAP: [(&str, i64); 4] = + [("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)]; + // Periodically exposes the pending metrics by calculating the average pending messages over different intervals. async fn expose_pending_metrics( refresh_interval: Duration, pending_stats: Arc>>, ) { let mut ticker = time::interval(refresh_interval); - let lookback_seconds_map = vec![("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)]; // store the pending info in a sorted way for deterministic display // string concat is more efficient? @@ -504,8 +480,8 @@ async fn expose_pending_metrics( loop { ticker.tick().await; - for (label, seconds) in &lookback_seconds_map { - let pending = calculate_pending(*seconds, &pending_stats).await; + for (label, seconds) in LOOKBACK_SECONDS_MAP { + let pending = calculate_pending(seconds, &pending_stats).await; if pending != -1 { let mut metric_labels = forward_metrics_labels().clone(); metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); @@ -553,14 +529,17 @@ async fn calculate_pending( #[cfg(test)] mod tests { - use super::*; - use crate::monovertex::metrics::MetricsState; - use crate::shared::utils::create_rpc_channel; + use std::net::SocketAddr; + use std::time::Instant; + use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow::{sink, source, sourcetransform}; - use std::net::SocketAddr; use tokio::sync::mpsc::Sender; + use super::*; + use crate::monovertex::metrics::MetricsState; + use crate::shared::utils::create_rpc_channel; + struct SimpleSource; #[tonic::async_trait] impl source::Sourcer for SimpleSource { @@ -709,4 +688,58 @@ mod tests { fb_sink_server_handle.await.unwrap(); transformer_handle.await.unwrap(); } + + #[tokio::test] + async fn test_expose_pending_metrics() { + let pending_stats = Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))); + let refresh_interval = Duration::from_secs(1); + + // Populate pending_stats with some values. + // The array will be sorted by the timestamp with the most recent last. + { + let mut pending_stats = pending_stats.lock().await; + pending_stats.push(TimestampedPending { + pending: 15, + timestamp: Instant::now() - Duration::from_secs(150), + }); + pending_stats.push(TimestampedPending { + pending: 30, + timestamp: Instant::now() - Duration::from_secs(70), + }); + pending_stats.push(TimestampedPending { + pending: 20, + timestamp: Instant::now() - Duration::from_secs(30), + }); + pending_stats.push(TimestampedPending { + pending: 10, + timestamp: Instant::now(), + }); + } + + tokio::spawn({ + let pending_stats = pending_stats.clone(); + async move { + expose_pending_metrics(refresh_interval, pending_stats).await; + } + }); + // We use tokio::time::interval() as the ticker in the expose_pending_metrics() function. + // The first tick happens immediately, so we don't need to wait for the refresh_interval for the first iteration to complete. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Get the stored values for all time intevals + // We will store the values corresponding to the labels (from LOOKBACK_SECONDS_MAP) "1m", "default", "5m", "15" in the same order in this array + let mut stored_values: [i64; 4] = [0; 4]; + { + for (i, (label, _)) in LOOKBACK_SECONDS_MAP.iter().enumerate() { + let mut metric_labels = forward_metrics_labels().clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + let guage = forward_metrics() + .source_pending + .get_or_create(&metric_labels) + .get(); + stored_values[i] = guage; + } + } + assert_eq!(stored_values, [15, 20, 18, 18]); + } } diff --git a/rust/numaflow-core/src/shared/server_info.rs b/rust/numaflow-core/src/shared/server_info.rs index f058b7a31..b3f8b5184 100644 --- a/rust/numaflow-core/src/shared/server_info.rs +++ b/rust/numaflow-core/src/shared/server_info.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; @@ -190,12 +190,12 @@ fn human_readable(ver: &str) -> String { return String::new(); } // semver - if ver.ends_with("-z") { - return ver[..ver.len() - 2].to_string(); + if let Some(version) = ver.strip_suffix("-z") { + return version.to_string(); } // PEP 440 - if ver.ends_with("rc100") { - return ver[..ver.len() - 5].to_string(); + if let Some(version) = ver.strip_suffix("rc100") { + return version.to_string(); } ver.to_string() } @@ -265,7 +265,7 @@ fn trim_after_dash(input: &str) -> &str { /// Extracts the container type from the server info file. /// The file name is in the format of -server-info. -fn get_container_type(server_info_file: &PathBuf) -> Option<&str> { +fn get_container_type(server_info_file: &Path) -> Option<&str> { let file_name = server_info_file.file_name()?; let container_type = file_name.to_str()?.trim_end_matches("-server-info"); if container_type.is_empty() {