Skip to content

Commit

Permalink
update: added verification status to verify job
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Dec 15, 2024
1 parent c2e00d1 commit fad192d
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- refactor: instrumentations
- `is_worker_enabled` status check moved from `VerificationFailed` to `Failed`
- refactor: static attributes for telemetry
- refactor: aws setup for Event Bridge
Expand Down
24 changes: 14 additions & 10 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub async fn create_job(
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
}
Expand All @@ -201,7 +201,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

tracing::Span::current().record("job", format!("{:?}", job.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));
tracing::Span::current().record("internal_id", job.internal_id.clone());

tracing::debug!(job_id = ?id, status = ?job.status, "Current job status");
Expand Down Expand Up @@ -300,7 +300,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
Expand Down Expand Up @@ -341,7 +341,13 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job_handler = factory::get_job_handler(&job.job_type).await;
tracing::debug!(job_id = ?id, "Verifying job with handler");
let verification_status = job_handler.verify_job(config.clone(), &mut job).await?;
tracing::Span::current().record("verification_status", format!("{:?}", verification_status.clone()));
tracing::Span::current().record("verification_status", format!("{:?}", &verification_status));

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)),
];

match verification_status {
JobVerificationStatus::Verified => {
Expand Down Expand Up @@ -437,14 +443,9 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
}
};

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
];

tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
Ok(())
Expand Down Expand Up @@ -491,6 +492,9 @@ async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String)
{
Ok(_) => {
tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block");
ORCHESTRATOR_METRICS
.failed_jobs
.add(1.0, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]);
Ok(())
}
Err(e) => {
Expand Down
25 changes: 20 additions & 5 deletions crates/orchestrator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics);

pub struct OrchestratorMetrics {
pub block_gauge: Gauge<f64>,
pub successful_jobs: Counter<f64>,
pub successful_job_operations: Counter<f64>,
pub failed_job_operations: Counter<f64>,
pub failed_jobs: Counter<f64>,
pub jobs_response_time: Gauge<f64>,
pub db_calls_response_time: Gauge<f64>,
Expand All @@ -35,10 +36,17 @@ impl Metrics for OrchestratorMetrics {
"block".to_string(),
);

let successful_jobs = register_counter_metric_instrument(
let successful_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"successful_jobs".to_string(),
"A counter to show count of successful jobs over time".to_string(),
"successful_job_operations".to_string(),
"A counter to show count of successful job operations over time".to_string(),
"jobs".to_string(),
);

let failed_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"failed_job_operations".to_string(),
"A counter to show count of failed job operations over time".to_string(),
"jobs".to_string(),
);

Expand All @@ -63,6 +71,13 @@ impl Metrics for OrchestratorMetrics {
"Time".to_string(),
);

Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time }
Self {
block_gauge,
successful_job_operations,
failed_job_operations,
failed_jobs,
jobs_response_time,
db_calls_response_time,
}
}
}
6 changes: 2 additions & 4 deletions crates/orchestrator/src/routes/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ async fn handle_process_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "process_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand All @@ -69,8 +68,7 @@ async fn handle_verify_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "verify_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Worker for DataSubmissionWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Worker for ProvingWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Worker for SnosWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Worker for UpdateStateWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::StateTransition)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
return Err(e.into());
}
}
Expand Down

0 comments on commit fad192d

Please sign in to comment.