From ee8b83ac649ab0ec4860cf8b03720ac6016aadaa Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Thu, 29 Aug 2024 16:32:21 -0700 Subject: [PATCH] chore: refactor metric variable names (#2012) Signed-off-by: Vigith Maurice --- rust/monovertex/src/forwarder.rs | 10 ++--- rust/monovertex/src/metrics.rs | 68 ++++++++++++++++---------------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/rust/monovertex/src/forwarder.rs b/rust/monovertex/src/forwarder.rs index d9f68fc608..8f61b68ff1 100644 --- a/rust/monovertex/src/forwarder.rs +++ b/rust/monovertex/src/forwarder.rs @@ -106,7 +106,7 @@ impl Forwarder { } forward_metrics() - .monovtx_processing_time + .e2e_processing_time .get_or_create(&self.common_labels) .observe(start_time.elapsed().as_micros() as f64); } @@ -135,7 +135,7 @@ impl Forwarder { let msg_count = messages.len() as u64; forward_metrics() - .monovtx_read_total + .read_total .get_or_create(&self.common_labels) .inc_by(msg_count); @@ -149,7 +149,7 @@ impl Forwarder { ); forward_metrics() - .monovtx_read_bytes_total + .read_bytes_total .get_or_create(&self.common_labels) .inc_by(bytes_count); @@ -276,7 +276,7 @@ impl Forwarder { } forward_metrics() - .monovtx_sink_write_total + .sink_write_total .get_or_create(&self.common_labels) .inc_by(msg_count); Ok(()) @@ -382,7 +382,7 @@ impl Forwarder { self.source_client.ack_fn(offsets).await?; debug!("Ack latency - {}ms", start_time.elapsed().as_millis()); forward_metrics() - .monovtx_ack_total + .ack_total .get_or_create(&self.common_labels) .inc_by(n as u64); Ok(()) diff --git a/rust/monovertex/src/metrics.rs b/rust/monovertex/src/metrics.rs index 7a87b508a4..fd612ba12a 100644 --- a/rust/monovertex/src/metrics.rs +++ b/rust/monovertex/src/metrics.rs @@ -29,8 +29,8 @@ use prometheus_client::registry::Registry; // Define the labels for the metrics // Note: Please keep consistent with the definitions in MonoVertex daemon -pub const MONO_VERTEX_NAME_LABEL: &str = "mvtx_name"; -pub const REPLICA_LABEL: &str = "mvtx_replica"; +const VERTEX_NAME_LABEL: &str = "mvtx_name"; +const REPLICA_LABEL: &str = "mvtx_replica"; const PENDING_PERIOD_LABEL: &str = "period"; // Define the metrics @@ -39,12 +39,12 @@ const PENDING_PERIOD_LABEL: &str = "period"; // refer: https://github.com/prometheus/client_rust/blob/master/src/registry.rs#L102 // Note: Please keep consistent with the definitions in MonoVertex daemon -const MONOVTX_READ_TOTAL: &str = "monovtx_read"; -const MONOVTX_READ_BYTES_TOTAL: &str = "monovtx_read_bytes"; -const MONOVTX_ACK_TOTAL: &str = "monovtx_ack"; -const MONOVTX_SINK_WRITE_TOTAL: &str = "monovtx_sink_write"; -const MONOVTX_PROCESSING_TIME: &str = "monovtx_processing_time"; -const MONOVTX_PENDING: &str = "monovtx_pending"; +const READ_TOTAL: &str = "monovtx_read"; +const READ_BYTES_TOTAL: &str = "monovtx_read_bytes"; +const ACK_TOTAL: &str = "monovtx_ack"; +const SINK_WRITE_TOTAL: &str = "monovtx_sink_write"; +const E2E_PROCESSING_TIME: &str = "monovtx_processing_time"; +const SOURCE_PENDING: &str = "monovtx_pending"; #[derive(Clone)] pub(crate) struct MetricsState { @@ -88,12 +88,12 @@ fn global_registry() -> &'static GlobalRegistry { // The labels are provided in the form of Vec<(String, String) // The second argument is the metric kind. pub struct MonoVtxMetrics { - pub monovtx_read_total: Family, Counter>, - pub monovtx_read_bytes_total: Family, Counter>, - pub monovtx_ack_total: Family, Counter>, - pub monovtx_sink_write_total: Family, Counter>, - pub monovtx_processing_time: Family, Histogram>, - pub monovtx_pending: Family, Gauge>, + pub read_total: Family, Counter>, + pub read_bytes_total: Family, Counter>, + pub ack_total: Family, Counter>, + pub sink_write_total: Family, Counter>, + pub e2e_processing_time: Family, Histogram>, + pub source_pending: Family, Gauge>, } /// impl the MonoVtxMetrics struct and create a new object @@ -111,45 +111,45 @@ impl MonoVtxMetrics { let monovtx_pending = Family::, Gauge>::default(); let metrics = Self { - monovtx_read_total, - monovtx_read_bytes_total, - monovtx_ack_total, - monovtx_sink_write_total, - monovtx_processing_time, - monovtx_pending, + read_total: monovtx_read_total, + read_bytes_total: monovtx_read_bytes_total, + ack_total: monovtx_ack_total, + sink_write_total: monovtx_sink_write_total, + e2e_processing_time: monovtx_processing_time, + source_pending: monovtx_pending, }; let mut registry = global_registry().registry.lock(); // Register all the metrics to the global registry registry.register( - MONOVTX_READ_TOTAL, + READ_TOTAL, "A Counter to keep track of the total number of messages read from the source", - metrics.monovtx_read_total.clone(), + metrics.read_total.clone(), ); registry.register( - MONOVTX_SINK_WRITE_TOTAL, + SINK_WRITE_TOTAL, "A Counter to keep track of the total number of messages written to the sink", - metrics.monovtx_sink_write_total.clone(), + metrics.sink_write_total.clone(), ); registry.register( - MONOVTX_ACK_TOTAL, + ACK_TOTAL, "A Counter to keep track of the total number of messages acknowledged by the sink", - metrics.monovtx_ack_total.clone(), + metrics.ack_total.clone(), ); registry.register( - MONOVTX_PROCESSING_TIME, + E2E_PROCESSING_TIME, "A Histogram to keep track of the total time taken to forward a chunk, the time is in microseconds", - metrics.monovtx_processing_time.clone(), + metrics.e2e_processing_time.clone(), ); registry.register( - MONOVTX_READ_BYTES_TOTAL, + READ_BYTES_TOTAL, "A Counter to keep track of the total number of bytes read from the source", - metrics.monovtx_read_bytes_total.clone(), + metrics.read_bytes_total.clone(), ); registry.register( - MONOVTX_PENDING, + SOURCE_PENDING, "A Gauge to keep track of the total number of pending messages for the monovtx", - metrics.monovtx_pending.clone(), + metrics.source_pending.clone(), ); metrics @@ -177,7 +177,7 @@ pub(crate) fn forward_metrics_labels() -> &'static Vec<(String, String)> { MONOVTX_METRICS_LABELS.get_or_init(|| { let common_labels = vec![ ( - MONO_VERTEX_NAME_LABEL.to_string(), + VERTEX_NAME_LABEL.to_string(), config().mono_vertex_name.clone(), ), (REPLICA_LABEL.to_string(), config().replica.to_string()), @@ -435,7 +435,7 @@ async fn expose_pending_metrics( let mut metric_labels = forward_metrics_labels().clone(); metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); forward_metrics() - .monovtx_pending + .source_pending .get_or_create(&metric_labels) .set(pending); info!("Pending messages ({}): {}", label, pending);