Skip to content

Commit

Permalink
chore: refactor metric variable names (#2012)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Aug 29, 2024
1 parent 2ba5411 commit ee8b83a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
10 changes: 5 additions & 5 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Forwarder {
}

forward_metrics()
.monovtx_processing_time
.e2e_processing_time
.get_or_create(&self.common_labels)
.observe(start_time.elapsed().as_micros() as f64);
}
Expand Down Expand Up @@ -135,7 +135,7 @@ impl Forwarder {

let msg_count = messages.len() as u64;
forward_metrics()
.monovtx_read_total
.read_total
.get_or_create(&self.common_labels)
.inc_by(msg_count);

Expand All @@ -149,7 +149,7 @@ impl Forwarder {
);

forward_metrics()
.monovtx_read_bytes_total
.read_bytes_total
.get_or_create(&self.common_labels)
.inc_by(bytes_count);

Expand Down Expand Up @@ -276,7 +276,7 @@ impl Forwarder {
}

forward_metrics()
.monovtx_sink_write_total
.sink_write_total
.get_or_create(&self.common_labels)
.inc_by(msg_count);
Ok(())
Expand Down Expand Up @@ -382,7 +382,7 @@ impl Forwarder {
self.source_client.ack_fn(offsets).await?;
debug!("Ack latency - {}ms", start_time.elapsed().as_millis());
forward_metrics()
.monovtx_ack_total
.ack_total
.get_or_create(&self.common_labels)
.inc_by(n as u64);
Ok(())
Expand Down
68 changes: 34 additions & 34 deletions rust/monovertex/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use prometheus_client::registry::Registry;

// Define the labels for the metrics
// Note: Please keep consistent with the definitions in MonoVertex daemon
pub const MONO_VERTEX_NAME_LABEL: &str = "mvtx_name";
pub const REPLICA_LABEL: &str = "mvtx_replica";
const VERTEX_NAME_LABEL: &str = "mvtx_name";
const REPLICA_LABEL: &str = "mvtx_replica";
const PENDING_PERIOD_LABEL: &str = "period";

// Define the metrics
Expand All @@ -39,12 +39,12 @@ const PENDING_PERIOD_LABEL: &str = "period";
// refer: https://github.com/prometheus/client_rust/blob/master/src/registry.rs#L102

// Note: Please keep consistent with the definitions in MonoVertex daemon
const MONOVTX_READ_TOTAL: &str = "monovtx_read";
const MONOVTX_READ_BYTES_TOTAL: &str = "monovtx_read_bytes";
const MONOVTX_ACK_TOTAL: &str = "monovtx_ack";
const MONOVTX_SINK_WRITE_TOTAL: &str = "monovtx_sink_write";
const MONOVTX_PROCESSING_TIME: &str = "monovtx_processing_time";
const MONOVTX_PENDING: &str = "monovtx_pending";
const READ_TOTAL: &str = "monovtx_read";
const READ_BYTES_TOTAL: &str = "monovtx_read_bytes";
const ACK_TOTAL: &str = "monovtx_ack";
const SINK_WRITE_TOTAL: &str = "monovtx_sink_write";
const E2E_PROCESSING_TIME: &str = "monovtx_processing_time";
const SOURCE_PENDING: &str = "monovtx_pending";

#[derive(Clone)]
pub(crate) struct MetricsState {
Expand Down Expand Up @@ -88,12 +88,12 @@ fn global_registry() -> &'static GlobalRegistry {
// The labels are provided in the form of Vec<(String, String)
// The second argument is the metric kind.
pub struct MonoVtxMetrics {
pub monovtx_read_total: Family<Vec<(String, String)>, Counter>,
pub monovtx_read_bytes_total: Family<Vec<(String, String)>, Counter>,
pub monovtx_ack_total: Family<Vec<(String, String)>, Counter>,
pub monovtx_sink_write_total: Family<Vec<(String, String)>, Counter>,
pub monovtx_processing_time: Family<Vec<(String, String)>, Histogram>,
pub monovtx_pending: Family<Vec<(String, String)>, Gauge>,
pub read_total: Family<Vec<(String, String)>, Counter>,
pub read_bytes_total: Family<Vec<(String, String)>, Counter>,
pub ack_total: Family<Vec<(String, String)>, Counter>,
pub sink_write_total: Family<Vec<(String, String)>, Counter>,
pub e2e_processing_time: Family<Vec<(String, String)>, Histogram>,
pub source_pending: Family<Vec<(String, String)>, Gauge>,
}

/// impl the MonoVtxMetrics struct and create a new object
Expand All @@ -111,45 +111,45 @@ impl MonoVtxMetrics {
let monovtx_pending = Family::<Vec<(String, String)>, Gauge>::default();

let metrics = Self {
monovtx_read_total,
monovtx_read_bytes_total,
monovtx_ack_total,
monovtx_sink_write_total,
monovtx_processing_time,
monovtx_pending,
read_total: monovtx_read_total,
read_bytes_total: monovtx_read_bytes_total,
ack_total: monovtx_ack_total,
sink_write_total: monovtx_sink_write_total,
e2e_processing_time: monovtx_processing_time,
source_pending: monovtx_pending,
};

let mut registry = global_registry().registry.lock();
// Register all the metrics to the global registry
registry.register(
MONOVTX_READ_TOTAL,
READ_TOTAL,
"A Counter to keep track of the total number of messages read from the source",
metrics.monovtx_read_total.clone(),
metrics.read_total.clone(),
);
registry.register(
MONOVTX_SINK_WRITE_TOTAL,
SINK_WRITE_TOTAL,
"A Counter to keep track of the total number of messages written to the sink",
metrics.monovtx_sink_write_total.clone(),
metrics.sink_write_total.clone(),
);
registry.register(
MONOVTX_ACK_TOTAL,
ACK_TOTAL,
"A Counter to keep track of the total number of messages acknowledged by the sink",
metrics.monovtx_ack_total.clone(),
metrics.ack_total.clone(),
);
registry.register(
MONOVTX_PROCESSING_TIME,
E2E_PROCESSING_TIME,
"A Histogram to keep track of the total time taken to forward a chunk, the time is in microseconds",
metrics.monovtx_processing_time.clone(),
metrics.e2e_processing_time.clone(),
);
registry.register(
MONOVTX_READ_BYTES_TOTAL,
READ_BYTES_TOTAL,
"A Counter to keep track of the total number of bytes read from the source",
metrics.monovtx_read_bytes_total.clone(),
metrics.read_bytes_total.clone(),
);
registry.register(
MONOVTX_PENDING,
SOURCE_PENDING,
"A Gauge to keep track of the total number of pending messages for the monovtx",
metrics.monovtx_pending.clone(),
metrics.source_pending.clone(),
);

metrics
Expand Down Expand Up @@ -177,7 +177,7 @@ pub(crate) fn forward_metrics_labels() -> &'static Vec<(String, String)> {
MONOVTX_METRICS_LABELS.get_or_init(|| {
let common_labels = vec![
(
MONO_VERTEX_NAME_LABEL.to_string(),
VERTEX_NAME_LABEL.to_string(),
config().mono_vertex_name.clone(),
),
(REPLICA_LABEL.to_string(), config().replica.to_string()),
Expand Down Expand Up @@ -435,7 +435,7 @@ async fn expose_pending_metrics(
let mut metric_labels = forward_metrics_labels().clone();
metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string()));
forward_metrics()
.monovtx_pending
.source_pending
.get_or_create(&metric_labels)
.set(pending);
info!("Pending messages ({}): {}", label, pending);
Expand Down

0 comments on commit ee8b83a

Please sign in to comment.