Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/instrumentations #192

Merged
merged 5 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME= # S3 bucket name
#### INSTRUMENTATION ####
## OTEL ##
MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME= # OpenTelemetry service name
MADARA_ORCHESTRATOR_OTES_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint
MADARA_ORCHESTRATOR_OTEL_COLLECTOR_ENDPOINT= # OpenTelemetry collector endpoint

#### SERVER ####
MADARA_ORCHESTRATOR_HOST= # Server host
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- refactor: instrumentations
- `is_worker_enabled` status check moved from `VerificationFailed` to `Failed`
- refactor: static attributes for telemetry
- refactor: aws setup for Event Bridge
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/jobs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_bl
pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run";
pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact";
pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason";
pub const JOB_METADATA_ERROR: &str = "error";
pub const JOB_METADATA_PROCESSING_COMPLETED_AT: &str = "processing_completed_at";
50 changes: 37 additions & 13 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use chrono::Utc;
use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use constants::{JOB_METADATA_ERROR, JOB_METADATA_FAILURE_REASON, JOB_METADATA_PROCESSING_COMPLETED_AT};
use conversion::parse_string;
use da_job::DaError;
use futures::FutureExt;
Expand Down Expand Up @@ -186,7 +187,7 @@ pub async fn create_job(
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
}
Expand All @@ -201,7 +202,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

tracing::Span::current().record("job", format!("{:?}", job.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));
tracing::Span::current().record("internal_id", job.internal_id.clone());

tracing::debug!(job_id = ?id, status = ?job.status, "Current job status");
Expand Down Expand Up @@ -234,6 +235,9 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
let external_id = match AssertUnwindSafe(job_handler.process_job(config.clone(), &mut job)).catch_unwind().await {
Ok(Ok(external_id)) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
// Add the time of processing to the metadata.
job.metadata
.insert(JOB_METADATA_PROCESSING_COMPLETED_AT.to_string(), Utc::now().timestamp_millis().to_string());
external_id
}
Ok(Err(e)) => {
Expand Down Expand Up @@ -300,7 +304,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
Expand Down Expand Up @@ -341,14 +345,36 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job_handler = factory::get_job_handler(&job.job_type).await;
tracing::debug!(job_id = ?id, "Verifying job with handler");
let verification_status = job_handler.verify_job(config.clone(), &mut job).await?;
tracing::Span::current().record("verification_status", format!("{:?}", verification_status.clone()));
tracing::Span::current().record("verification_status", format!("{:?}", &verification_status));

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)),
];

match verification_status {
JobVerificationStatus::Verified => {
tracing::info!(job_id = ?id, "Job verified successfully");
match job
.metadata
.get(JOB_METADATA_PROCESSING_COMPLETED_AT)
.and_then(|time| time.parse::<i64>().ok())
.map(|start| Utc::now().timestamp_millis() - start)
{
Some(time_taken) => ORCHESTRATOR_METRICS
.verification_time
.record(time_taken as f64, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]),
None => tracing::warn!("Failed to calculate verification time: Invalid or missing processing time"),
}
let mut metadata = job.metadata.clone();
metadata.remove("processing_completed_at");
config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Completed).build())
.update_job(
&job,
JobItemUpdates::new().update_metadata(metadata).update_status(JobStatus::Completed).build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed");
Expand All @@ -358,7 +384,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
JobVerificationStatus::Rejected(e) => {
tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected");
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e);
new_job.status = JobStatus::VerificationFailed;

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
Expand Down Expand Up @@ -437,14 +463,9 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
}
};

let attributes = [
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
];

tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
Ok(())
Expand Down Expand Up @@ -491,6 +512,9 @@ async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String)
{
Ok(_) => {
tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block");
ORCHESTRATOR_METRICS
.failed_jobs
.add(1.0, &[KeyValue::new("operation_job_type", format!("{:?}", job.job_type))]);
Ok(())
}
Err(e) => {
Expand Down
38 changes: 31 additions & 7 deletions crates/orchestrator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics);

pub struct OrchestratorMetrics {
pub block_gauge: Gauge<f64>,
pub successful_jobs: Counter<f64>,
pub successful_job_operations: Counter<f64>,
pub failed_job_operations: Counter<f64>,
pub failed_jobs: Counter<f64>,
pub verification_time: Gauge<f64>,
pub jobs_response_time: Gauge<f64>,
pub db_calls_response_time: Gauge<f64>,
}
Expand All @@ -35,10 +37,17 @@ impl Metrics for OrchestratorMetrics {
"block".to_string(),
);

let successful_jobs = register_counter_metric_instrument(
let successful_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"successful_jobs".to_string(),
"A counter to show count of successful jobs over time".to_string(),
"successful_job_operations".to_string(),
"A counter to show count of successful job operations over time".to_string(),
"jobs".to_string(),
);

let failed_job_operations = register_counter_metric_instrument(
&orchestrator_meter,
"failed_job_operations".to_string(),
"A counter to show count of failed job operations over time".to_string(),
"jobs".to_string(),
);

Expand All @@ -49,20 +58,35 @@ impl Metrics for OrchestratorMetrics {
"jobs".to_string(),
);

let verification_time = register_gauge_metric_instrument(
&orchestrator_meter,
"verification_time".to_string(),
"A gauge to show the time taken for verification of tasks".to_string(),
"ms".to_string(),
);

let jobs_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"jobs_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
"s".to_string(),
);

let db_calls_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"db_calls_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
"s".to_string(),
);

Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time }
Self {
block_gauge,
successful_job_operations,
failed_job_operations,
failed_jobs,
verification_time,
jobs_response_time,
db_calls_response_time,
}
}
}
6 changes: 2 additions & 4 deletions crates/orchestrator/src/routes/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ async fn handle_process_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "process_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand All @@ -69,8 +68,7 @@ async fn handle_verify_job_request(
ApiResponse::success(response).into_response()
}
Err(e) => {
let attributes = [KeyValue::new("operation_type", "verify_job")];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]);
ApiResponse::<JobApiResponse>::error(e.to_string()).into_response()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Worker for DataSubmissionWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Worker for ProvingWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Worker for SnosWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Worker for UpdateStateWorker {
KeyValue::new("operation_job_type", format!("{:?}", JobType::StateTransition)),
KeyValue::new("operation_type", format!("{:?}", "create_job")),
];
ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes);
return Err(e.into());
}
}
Expand Down
Loading