diff --git a/.env.example b/.env.example index 7c69f686..43b3dc0f 100644 --- a/.env.example +++ b/.env.example @@ -73,7 +73,7 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME= # S3 bucket name #### INSTRUMENTATION #### ## OTEL ## MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME= # OpenTelemetry service name -MADARA_ORCHESTRATOR_OTES_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint +MADARA_ORCHESTRATOR_OTEL_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint #### SERVER #### MADARA_ORCHESTRATOR_HOST= # Server host diff --git a/CHANGELOG.md b/CHANGELOG.md index c434fd1f..b89ef854 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed +- refactor: instrumentations - `is_worker_enabled` status check moved from `VerificationFailed` to `Failed` - refactor: static attributes for telemetry - refactor: aws setup for Event Bridge diff --git a/crates/orchestrator/src/jobs/constants.rs b/crates/orchestrator/src/jobs/constants.rs index 596017a2..07992401 100644 --- a/crates/orchestrator/src/jobs/constants.rs +++ b/crates/orchestrator/src/jobs/constants.rs @@ -7,3 +7,5 @@ pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_bl pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run"; pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact"; pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason"; +pub const JOB_METADATA_ERROR: &str = "error"; +pub const JOB_METADATA_PROCESSING_COMPLETED_AT: &str = "processing_completed_at"; diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 2263090a..2c791a27 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; +use chrono::Utc; use color_eyre::eyre::{eyre, Context}; -use constants::JOB_METADATA_FAILURE_REASON; +use constants::{JOB_METADATA_ERROR, JOB_METADATA_FAILURE_REASON, JOB_METADATA_PROCESSING_COMPLETED_AT}; use conversion::parse_string; use da_job::DaError; use futures::FutureExt; @@ -186,7 +187,7 @@ pub async fn create_job( tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block"); let duration = start.elapsed(); ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes); - ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) } @@ -201,7 +202,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block"); tracing::Span::current().record("job", format!("{:?}", job.clone())); - tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone())); + tracing::Span::current().record("job_type", format!("{:?}", job.job_type)); tracing::Span::current().record("internal_id", job.internal_id.clone()); tracing::debug!(job_id = ?id, status = ?job.status, "Current job status"); @@ -234,6 +235,9 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> let external_id = match AssertUnwindSafe(job_handler.process_job(config.clone(), &mut job)).catch_unwind().await { Ok(Ok(external_id)) => { tracing::debug!(job_id = ?id, "Successfully processed job"); + // Add the time of processing to the metadata. + job.metadata + .insert(JOB_METADATA_PROCESSING_COMPLETED_AT.to_string(), Utc::now().timestamp_millis().to_string()); external_id } Ok(Err(e)) => { @@ -300,7 +304,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block"); let duration = start.elapsed(); - ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) @@ -341,14 +345,36 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { let job_handler = factory::get_job_handler(&job.job_type).await; tracing::debug!(job_id = ?id, "Verifying job with handler"); let verification_status = job_handler.verify_job(config.clone(), &mut job).await?; - tracing::Span::current().record("verification_status", format!("{:?}", verification_status.clone())); + tracing::Span::current().record("verification_status", format!("{:?}", &verification_status)); + + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), + KeyValue::new("operation_type", "verify_job"), + KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)), + ]; match verification_status { JobVerificationStatus::Verified => { tracing::info!(job_id = ?id, "Job verified successfully"); + match job + .metadata + .get(JOB_METADATA_PROCESSING_COMPLETED_AT) + .and_then(|time| time.parse::().ok()) + .map(|start| Utc::now().timestamp_millis() - start) + { + Some(time_taken) => ORCHESTRATOR_METRICS + .verification_time + .record(time_taken as f64, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]), + None => tracing::warn!("Failed to calculate verification time: Invalid or missing processing time"), + } + let mut metadata = job.metadata.clone(); + metadata.remove("processing_completed_at"); config .database() - .update_job(&job, JobItemUpdates::new().update_status(JobStatus::Completed).build()) + .update_job( + &job, + JobItemUpdates::new().update_metadata(metadata).update_status(JobStatus::Completed).build(), + ) .await .map_err(|e| { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed"); @@ -358,7 +384,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { JobVerificationStatus::Rejected(e) => { tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected"); let mut new_job = job.clone(); - new_job.metadata.insert("error".to_string(), e); + new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e); new_job.status = JobStatus::VerificationFailed; let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY) @@ -437,14 +463,9 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { } }; - let attributes = [ - KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), - KeyValue::new("operation_type", "verify_job"), - ]; - tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); let duration = start.elapsed(); - ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); Ok(()) @@ -491,6 +512,9 @@ async fn move_job_to_failed(job: &JobItem, config: Arc, reason: String) { Ok(_) => { tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block"); + ORCHESTRATOR_METRICS + .failed_jobs + .add(1.0, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]); Ok(()) } Err(e) => { diff --git a/crates/orchestrator/src/metrics.rs b/crates/orchestrator/src/metrics.rs index 67679d17..9fddfbe7 100644 --- a/crates/orchestrator/src/metrics.rs +++ b/crates/orchestrator/src/metrics.rs @@ -9,8 +9,10 @@ register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics); pub struct OrchestratorMetrics { pub block_gauge: Gauge, - pub successful_jobs: Counter, + pub successful_job_operations: Counter, + pub failed_job_operations: Counter, pub failed_jobs: Counter, + pub verification_time: Gauge, pub jobs_response_time: Gauge, pub db_calls_response_time: Gauge, } @@ -35,10 +37,17 @@ impl Metrics for OrchestratorMetrics { "block".to_string(), ); - let successful_jobs = register_counter_metric_instrument( + let successful_job_operations = register_counter_metric_instrument( &orchestrator_meter, - "successful_jobs".to_string(), - "A counter to show count of successful jobs over time".to_string(), + "successful_job_operations".to_string(), + "A counter to show count of successful job operations over time".to_string(), + "jobs".to_string(), + ); + + let failed_job_operations = register_counter_metric_instrument( + &orchestrator_meter, + "failed_job_operations".to_string(), + "A counter to show count of failed job operations over time".to_string(), "jobs".to_string(), ); @@ -49,20 +58,35 @@ impl Metrics for OrchestratorMetrics { "jobs".to_string(), ); + let verification_time = register_gauge_metric_instrument( + &orchestrator_meter, + "verification_time".to_string(), + "A gauge to show the time taken for verification of tasks".to_string(), + "ms".to_string(), + ); + let jobs_response_time = register_gauge_metric_instrument( &orchestrator_meter, "jobs_response_time".to_string(), "A gauge to show response time of jobs over time".to_string(), - "Time".to_string(), + "s".to_string(), ); let db_calls_response_time = register_gauge_metric_instrument( &orchestrator_meter, "db_calls_response_time".to_string(), "A gauge to show response time of jobs over time".to_string(), - "Time".to_string(), + "s".to_string(), ); - Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time } + Self { + block_gauge, + successful_job_operations, + failed_job_operations, + failed_jobs, + verification_time, + jobs_response_time, + db_calls_response_time, + } } } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 7f3cd489..a10730af 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -43,8 +43,7 @@ async fn handle_process_job_request( ApiResponse::success(response).into_response() } Err(e) => { - let attributes = [KeyValue::new("operation_type", "process_job")]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]); ApiResponse::::error(e.to_string()).into_response() } } @@ -69,8 +68,7 @@ async fn handle_verify_job_request( ApiResponse::success(response).into_response() } Err(e) => { - let attributes = [KeyValue::new("operation_type", "verify_job")]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]); ApiResponse::::error(e.to_string()).into_response() } } diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index bef5cc3f..24585435 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -34,7 +34,7 @@ impl Worker for DataSubmissionWorker { KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)), KeyValue::new("operation_type", format!("{:?}", "create_job")), ]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); } } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 20b0df9d..66c52dfe 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -35,7 +35,7 @@ impl Worker for ProvingWorker { KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)), KeyValue::new("operation_type", format!("{:?}", "create_job")), ]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); } } } diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 552da715..5af863c7 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -56,7 +56,7 @@ impl Worker for SnosWorker { KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)), KeyValue::new("operation_type", format!("{:?}", "create_job")), ]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); } } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index dd989aba..a6b7da2d 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -125,7 +125,7 @@ impl Worker for UpdateStateWorker { KeyValue::new("operation_job_type", format!("{:?}", JobType::StateTransition)), KeyValue::new("operation_type", format!("{:?}", "create_job")), ]; - ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); return Err(e.into()); } }