From 9eb4b55324a5d2acb2a54639565d9a59be50cb26 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 6 Dec 2024 23:52:50 +0530 Subject: [PATCH] update: added more metric --- .env.test | 3 +- .../orchestrator/src/database/mongodb/mod.rs | 55 +++++++++++++++++++ crates/orchestrator/src/jobs/mod.rs | 38 ++++++++----- crates/orchestrator/src/metrics.rs | 38 ++++++++++++- crates/orchestrator/src/routes/job_routes.rs | 20 ++++++- .../src/workers/data_submission_worker.rs | 15 ++++- crates/orchestrator/src/workers/proving.rs | 15 ++++- crates/orchestrator/src/workers/snos.rs | 12 +++- .../orchestrator/src/workers/update_state.rs | 10 +++- crates/utils/src/metrics/lib.rs | 11 +++- 10 files changed, 192 insertions(+), 25 deletions(-) diff --git a/.env.test b/.env.test index 62d74916..c154ebc0 100644 --- a/.env.test +++ b/.env.test @@ -13,6 +13,8 @@ AWS_DEFAULT_REGION=localhost # For EventBridge MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME=madara-orchestrator-worker-trigger +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_ROLE_NAME=madara-orchestrator-worker-trigger-role +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_POLICY_NAME=madara-orchestrator-worker-trigger-policy @@ -98,7 +100,6 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME=madara-orchestrator-test-bucket MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME=orchestrator - #### SERVER #### MADARA_ORCHESTRATOR_HOST=127.0.0.1 diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 4f18eae8..1c426017 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use async_std::stream::StreamExt; use async_trait::async_trait; use chrono::{SubsecRound, Utc}; @@ -10,6 +12,7 @@ use mongodb::options::{ UpdateOptions, }; use mongodb::{bson, Client, Collection}; +use opentelemetry::KeyValue; use url::Url; use utils::ToDocument; use uuid::Uuid; @@ -17,6 +20,7 @@ use uuid::Uuid; use crate::database::Database; use crate::jobs::types::{JobItem, JobItemUpdates, JobStatus, JobType}; use crate::jobs::JobError; +use crate::metrics::ORCHESTRATOR_METRICS; mod utils; @@ -67,6 +71,7 @@ impl MongoDb { impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn create_job(&self, job: JobItem) -> Result { + let start = Instant::now(); let options = UpdateOptions::builder().upsert(true).build(); let updates = job.to_document().map_err(|e| JobError::Other(e.into()))?; @@ -93,6 +98,12 @@ impl Database for MongoDb { .map_err(|e| JobError::Other(e.to_string().into()))?; if result.matched_count == 0 { + let attributes = [ + KeyValue::new("db_operation_name", "create_job"), + KeyValue::new("db_operation_job", format!("{:?}", job)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(job) } else { Err(JobError::JobAlreadyExists { internal_id: job.internal_id, job_type: job.job_type }) @@ -101,25 +112,40 @@ impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_job_by_id(&self, id: Uuid) -> Result> { + let start = Instant::now(); let filter = doc! { "id": id }; tracing::debug!(job_id = %id, category = "db_call", "Fetched job by ID"); + let attributes = [ + KeyValue::new("db_operation_name", "get_job_by_id"), + KeyValue::new("db_operation_id", format!("{:?}", id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, None).await?) } #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result> { + let start = Instant::now(); let filter = doc! { "internal_id": internal_id, "job_type": mongodb::bson::to_bson(&job_type)?, }; tracing::debug!(internal_id = %internal_id, job_type = ?job_type, category = "db_call", "Fetched job by internal ID and type"); + let attributes = [ + KeyValue::new("db_operation_name", "get_job_by_internal_id_and_type"), + KeyValue::new("db_operation_id", format!("{:?}", internal_id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, None).await?) } #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result { + let start = Instant::now(); // Filters to search for the job let filter = doc! { "id": current_job.id, @@ -154,6 +180,12 @@ impl Database for MongoDb { match result { Some(job) => { tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully"); + let attributes = [ + KeyValue::new("db_operation_name", "update_job"), + KeyValue::new("db_operation_id", format!("{:?}", current_job.id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(job) } None => { @@ -165,6 +197,7 @@ impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { + let start = Instant::now(); let pipeline = vec![ doc! { "$match": { @@ -199,6 +232,12 @@ impl Database for MongoDb { match cursor.try_next().await? { Some(doc) => { let job: JobItem = mongodb::bson::from_document(doc)?; + let attributes = [ + KeyValue::new("db_operation_name", "get_latest_job_by_type"), + KeyValue::new("db_operation_job_type", format!("{:?}", job_type)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(Some(job)) } None => Ok(None), @@ -233,6 +272,7 @@ impl Database for MongoDb { job_a_status: JobStatus, job_b_type: JobType, ) -> Result> { + let start = Instant::now(); // Convert enums to Bson strings let job_a_type_bson = Bson::String(format!("{:?}", job_a_type)); let job_a_status_bson = Bson::String(format!("{:?}", job_a_status)); @@ -323,6 +363,9 @@ impl Database for MongoDb { } tracing::debug!(job_count = vec_jobs.len(), category = "db_call", "Retrieved jobs without successor"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_without_successor")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(vec_jobs) } @@ -332,6 +375,7 @@ impl Database for MongoDb { job_type: JobType, job_status: JobStatus, ) -> Result> { + let start = Instant::now(); let filter = doc! { "job_type": bson::to_bson(&job_type)?, "status": bson::to_bson(&job_status)? @@ -339,6 +383,9 @@ impl Database for MongoDb { let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); tracing::debug!(job_type = ?job_type, job_status = ?job_status, category = "db_call", "Fetched latest job by type and status"); + let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type_and_status")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, find_options).await?) } @@ -349,6 +396,7 @@ impl Database for MongoDb { job_status: JobStatus, internal_id: String, ) -> Result> { + let start = Instant::now(); let filter = doc! { "job_type": bson::to_bson(&job_type)?, "status": bson::to_bson(&job_status)?, @@ -361,11 +409,15 @@ impl Database for MongoDb { }; let jobs: Vec = self.get_job_collection().find(filter, None).await?.try_collect().await?; tracing::debug!(job_type = ?job_type, job_status = ?job_status, internal_id = internal_id, category = "db_call", "Fetched jobs after internal ID by job type"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_after_internal_id_by_job_type")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(jobs) } #[tracing::instrument(skip(self, limit), fields(function_type = "db_call"), ret, err)] async fn get_jobs_by_statuses(&self, job_status: Vec, limit: Option) -> Result> { + let start = Instant::now(); let filter = doc! { "status": { // TODO: Check that the conversion leads to valid output! @@ -377,6 +429,9 @@ impl Database for MongoDb { let jobs: Vec = self.get_job_collection().find(filter, find_options).await?.try_collect().await?; tracing::debug!(job_count = jobs.len(), category = "db_call", "Retrieved jobs by statuses"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_by_statuses")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(jobs) } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e86aa984..91b6f1e5 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::fmt; use std::panic::AssertUnwindSafe; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use color_eyre::eyre::{eyre, Context}; @@ -151,6 +151,7 @@ pub async fn create_job( metadata: HashMap, config: Arc, ) -> Result<(), JobError> { + let start = Instant::now(); tracing::info!(log_type = "starting", category = "general", function_type = "create_job", job_type = ?job_type, block_no = %internal_id, "General create job started for block"); tracing::debug!( @@ -180,13 +181,16 @@ pub async fn create_job( .map_err(|e| JobError::Other(OtherError(e)))?; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job_type)), - KeyValue::new("type", "create_job"), - KeyValue::new("job", format!("{:?}", job_item)), + KeyValue::new("operation_job_type", format!("{:?}", job_type)), + KeyValue::new("operation_type", "create_job"), + KeyValue::new("operation_job", format!("{:?}", job_item)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) } @@ -194,6 +198,7 @@ pub async fn create_job( /// DB. It then adds the job to the verification queue. #[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)] pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let start = Instant::now(); let job = get_job(id, config.clone()).await?; let internal_id = job.internal_id.clone(); tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block"); @@ -292,13 +297,16 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> })?; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job.job_type)), - KeyValue::new("type", "process_job"), - KeyValue::new("job", format!("{:?}", job)), + KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), + KeyValue::new("operation_type", "process_job"), + KeyValue::new("operation_job", format!("{:?}", job)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) } @@ -315,6 +323,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> err )] pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let start = Instant::now(); let mut job = get_job(id, config.clone()).await?; let internal_id = job.internal_id.clone(); tracing::info!(log_type = "starting", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job started for block"); @@ -433,13 +442,16 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { }; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job.job_type)), - KeyValue::new("type", "verify_job"), - KeyValue::new("job", format!("{:?}", job)), + KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), + KeyValue::new("operation_type", "verify_job"), + KeyValue::new("operation_job", format!("{:?}", job)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); Ok(()) } diff --git a/crates/orchestrator/src/metrics.rs b/crates/orchestrator/src/metrics.rs index 0eade9f6..67679d17 100644 --- a/crates/orchestrator/src/metrics.rs +++ b/crates/orchestrator/src/metrics.rs @@ -1,14 +1,18 @@ use once_cell; use once_cell::sync::Lazy; -use opentelemetry::metrics::Gauge; +use opentelemetry::metrics::{Counter, Gauge}; use opentelemetry::{global, KeyValue}; -use utils::metrics::lib::{register_gauge_metric_instrument, Metrics}; +use utils::metrics::lib::{register_counter_metric_instrument, register_gauge_metric_instrument, Metrics}; use utils::register_metric; register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics); pub struct OrchestratorMetrics { pub block_gauge: Gauge, + pub successful_jobs: Counter, + pub failed_jobs: Counter, + pub jobs_response_time: Gauge, + pub db_calls_response_time: Gauge, } impl Metrics for OrchestratorMetrics { @@ -31,6 +35,34 @@ impl Metrics for OrchestratorMetrics { "block".to_string(), ); - Self { block_gauge } + let successful_jobs = register_counter_metric_instrument( + &orchestrator_meter, + "successful_jobs".to_string(), + "A counter to show count of successful jobs over time".to_string(), + "jobs".to_string(), + ); + + let failed_jobs = register_counter_metric_instrument( + &orchestrator_meter, + "failed_jobs".to_string(), + "A counter to show count of failed jobs over time".to_string(), + "jobs".to_string(), + ); + + let jobs_response_time = register_gauge_metric_instrument( + &orchestrator_meter, + "jobs_response_time".to_string(), + "A gauge to show response time of jobs over time".to_string(), + "Time".to_string(), + ); + + let db_calls_response_time = register_gauge_metric_instrument( + &orchestrator_meter, + "db_calls_response_time".to_string(), + "A gauge to show response time of jobs over time".to_string(), + "Time".to_string(), + ); + + Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time } } } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index aeeb25c2..636aa17e 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -4,12 +4,14 @@ use axum::extract::{Path, State}; use axum::response::IntoResponse; use axum::routing::get; use axum::Router; +use opentelemetry::KeyValue; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::ApiResponse; use crate::config::Config; use crate::jobs::{process_job, verify_job, JobError}; +use crate::metrics::ORCHESTRATOR_METRICS; #[derive(Deserialize)] struct JobId { @@ -40,7 +42,14 @@ async fn handle_process_job_request( let response = JobApiResponse { job_id: job_id.to_string(), status: "completed".to_string() }; ApiResponse::success(response).into_response() } - Err(e) => ApiResponse::::error(e.to_string()).into_response(), + Err(e) => { + let attributes = [ + KeyValue::new("operation_type", "process_job"), + KeyValue::new("operation_job_id", format!("{:?}", job_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ApiResponse::::error(e.to_string()).into_response() + } } } @@ -62,7 +71,14 @@ async fn handle_verify_job_request( let response = JobApiResponse { job_id: job_id.to_string(), status: "verified".to_string() }; ApiResponse::success(response).into_response() } - Err(e) => ApiResponse::::error(e.to_string()).into_response(), + Err(e) => { + let attributes = [ + KeyValue::new("operation_type", "verify_job"), + KeyValue::new("operation_job_id", format!("{:?}", job_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ApiResponse::::error(e.to_string()).into_response() + } } } pub fn job_router(config: Arc) -> Router { diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index 08c6068c..20e1955a 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -2,10 +2,12 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct DataSubmissionWorker; @@ -24,7 +26,18 @@ impl Worker for DataSubmissionWorker { .await?; for job in successful_proving_jobs { - create_job(JobType::DataSubmission, job.internal_id, HashMap::new(), config.clone()).await?; + match create_job(JobType::DataSubmission, job.internal_id.clone(), HashMap::new(), config.clone()).await { + Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new data submission job"), + Err(e) => { + tracing::warn!(block_id = %job.internal_id, error = %e, "Failed to create new data submission job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", job.internal_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + } + } } tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 523f310d..145235ea 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,10 +1,12 @@ use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct ProvingWorker; @@ -25,7 +27,18 @@ impl Worker for ProvingWorker { for job in successful_snos_jobs { tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job"); - create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await?; + match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await { + Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"), + Err(e) => { + tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", job.internal_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + } + } } tracing::trace!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index a346dd37..fc322f16 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -3,11 +3,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use starknet::providers::Provider; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::JobType; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct SnosWorker; @@ -47,9 +49,15 @@ impl Worker for SnosWorker { for block_num in block_start..latest_block_number + 1 { match create_job(JobType::SnosRun, block_num.to_string(), HashMap::new(), config.clone()).await { - Ok(_) => {} + Ok(_) => tracing::info!(block_id = %block_num, "Successfully created new Snos job"), Err(e) => { - log::warn!("Failed to create job: {:?}", e); + tracing::warn!(block_id = %block_num, error = %e, "Failed to create new Snos job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", block_num.to_string())), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); } } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index b6f28867..b2708c2d 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -2,11 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct UpdateStateWorker; @@ -116,9 +118,15 @@ impl Worker for UpdateStateWorker { // Creating a single job for all the pending blocks. let new_job_id = blocks_to_process[0].to_string(); match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()).await { - Ok(_) => tracing::info!(job_id = %new_job_id, "Successfully created new state transition job"), + Ok(_) => tracing::info!(block_id = %new_job_id, "Successfully created new state transition job"), Err(e) => { tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", new_job_id.to_string())), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); return Err(e.into()); } } diff --git a/crates/utils/src/metrics/lib.rs b/crates/utils/src/metrics/lib.rs index d31b6d15..07f69c2d 100644 --- a/crates/utils/src/metrics/lib.rs +++ b/crates/utils/src/metrics/lib.rs @@ -1,4 +1,4 @@ -use opentelemetry::metrics::{Gauge, Meter}; +use opentelemetry::metrics::{Counter, Gauge, Meter}; pub trait Metrics { fn register() -> Self; @@ -19,3 +19,12 @@ pub fn register_gauge_metric_instrument( ) -> Gauge { crate_meter.f64_gauge(instrument_name).with_description(desc).with_unit(unit).init() } + +pub fn register_counter_metric_instrument( + crate_meter: &Meter, + instrument_name: String, + desc: String, + unit: String, +) -> Counter { + crate_meter.f64_counter(instrument_name).with_description(desc).with_unit(unit).init() +}