Skip to content

Commit

Permalink
Fix/instrumentations (#192)
Browse files Browse the repository at this point in the history
* update: added verification status to verify job

* update: added verification time

* update: JOB_METADATA_PROCESSING_COMPLETED_AT
  • Loading branch information
heemankv authored Dec 15, 2024
1 parent c2e00d1 commit 9e67649
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME= # S3 bucket name
#### INSTRUMENTATION ####
## OTEL ##
MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME= # OpenTelemetry service name
MADARA_ORCHESTRATOR_OTES_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint
MADARA_ORCHESTRATOR_OTEL_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint

#### SERVER ####
MADARA_ORCHESTRATOR_HOST= # Server host
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- refactor: instrumentations
- `is_worker_enabled` status check moved from `VerificationFailed` to `Failed`
- refactor: static attributes for telemetry
- refactor: aws setup for Event Bridge
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/jobs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_bl
pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run";
pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact";
pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason";
pub const JOB_METADATA_ERROR: &str = "error";
pub const JOB_METADATA_PROCESSING_COMPLETED_AT: &str = "processing_completed_at";
50 changes: 37 additions & 13 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use chrono::Utc;
use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use constants::{JOB_METADATA_ERROR, JOB_METADATA_FAILURE_REASON, JOB_METADATA_PROCESSING_COMPLETED_AT};
use conversion::parse_string;
use da_job::DaError;
use futures::FutureExt;
Expand Down Expand Up @@ -186,7 +187,7 @@ pub async fn create_job(
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
}
Expand All @@ -201,7 +202,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

tracing::Span::current().record("job", format!("{:?}", job.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));
tracing::Span::current().record("internal_id", job.internal_id.clone());

tracing::debug!(job_id = ?id, status = ?job.status, "Current job status");
Expand Down Expand Up @@ -234,6 +235,9 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
let external_id = match AssertUnwindSafe(job_handler.process_job(config.clone(), &mut job)).catch_unwind().await {
Ok(Ok(external_id)) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
// Add the time of processing to the metadata.
job.metadata
.insert(JOB_METADATA_PROCESSING_COMPLETED_AT.to_string(), Utc::now().timestamp_millis().to_string());
external_id
}
Ok(Err(e)) => {
Expand Down Expand Up @@ -300,7 +304,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
Expand Down Expand Up @@ -341,14 +345,36 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job_handler = factory::get_job_handler(&job.job_type).await;
tracing::debug!(job_id = ?id, "Verifying job with handler");
let verification_status = job_handler.verify_job(config.clone(), &mut job).await?;
tracing::Span::current().record("verification_status", format!("{:?}", verification_status.clone()));
tracing::Span::current().record("verification_status", format!("{:?}", &verification_status));

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)),
];

match verification_status {
JobVerificationStatus::Verified => {
tracing::info!(job_id = ?id, "Job verified successfully");
match job
.metadata
.get(JOB_METADATA_PROCESSING_COMPLETED_AT)
.and_then(|time| time.parse::<i64>().ok())
.map(|start| Utc::now().timestamp_millis() - start)
{
Some(time_taken) => ORCHESTRATOR_METRICS
.verification_time
.record(time_taken as f64, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]),
None => tracing::warn!("Failed to calculate verification time: Invalid or missing processing time"),
}
let mut metadata = job.metadata.clone();
metadata.remove("processing_completed_at");
config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Completed).build())
.update_job(
&job,
JobItemUpdates::new().update_metadata(metadata).update_status(JobStatus::Completed).build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed");
Expand All @@ -358,7 +384,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
JobVerificationStatus::Rejected(e) => {
tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected");
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e);
new_job.status = JobStatus::VerificationFailed;

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
Expand Down Expand Up @@ -437,14 +463,9 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
}
};

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
];

tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
Ok(())
Expand Down Expand Up @@ -491,6 +512,9 @@ async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String)
{
Ok(_) => {
tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block");
ORCHESTRATOR_METRICS
.failed_jobs
.add(1.0, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]);
Ok(())
}
Err(e) => {
Expand Down
38 changes: 31 additions & 7 deletions crates/orchestrator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics);

pub struct OrchestratorMetrics {
pub block_gauge: Gauge<f64>,
pub successful_jobs: Counter<f64>,
pub successful_job_operations: Counter<f64>,
pub failed_job_operations: Counter<f64>,
pub failed_jobs: Counter<f64>,
pub verification_time: Gauge<f64>,
pub jobs_response_time: Gauge<f64>,
pub db_calls_response_time: Gauge<f64>,
}
Expand All @@ -35,10 +37,17 @@ impl Metrics for OrchestratorMetrics {
"block".to_string(),
);

let successful_jobs = register_counter_metric_instrument(
let successful_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"successful_jobs".to_string(),
"A counter to show count of successful jobs over time".to_string(),
"successful_job_operations".to_string(),
"A counter to show count of successful job operations over time".to_string(),
"jobs".to_string(),
);

let failed_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"failed_job_operations".to_string(),
"A counter to show count of failed job operations over time".to_string(),
"jobs".to_string(),
);

Expand All @@ -49,20 +58,35 @@ impl Metrics for OrchestratorMetrics {
"jobs".to_string(),
);

let verification_time = register_gauge_metric_instrument(
&orchestrator_meter,
"verification_time".to_string(),
"A gauge to show the time taken for verification of tasks".to_string(),
"ms".to_string(),
);

let jobs_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"jobs_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
"s".to_string(),
);

let db_calls_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"db_calls_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
"s".to_string(),
);

Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time }
Self {
block_gauge,
successful_job_operations,
failed_job_operations,
failed_jobs,
verification_time,
jobs_response_time,
db_calls_response_time,
}
}
}
6 changes: 2 additions & 4 deletions crates/orchestrator/src/routes/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ async fn handle_process_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "process_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand All @@ -69,8 +68,7 @@ async fn handle_verify_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "verify_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Worker for DataSubmissionWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Worker for ProvingWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Worker for SnosWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Worker for UpdateStateWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::StateTransition)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
return Err(e.into());
}
}
Expand Down

0 comments on commit 9e67649

Please sign in to comment.