Skip to content

Commit

Permalink
update: added more metric
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Dec 6, 2024
1 parent 687e5b4 commit 9eb4b55
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ AWS_DEFAULT_REGION=localhost
# For EventBridge

MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME=madara-orchestrator-worker-trigger
MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_ROLE_NAME=madara-orchestrator-worker-trigger-role
MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_POLICY_NAME=madara-orchestrator-worker-trigger-policy



Expand Down Expand Up @@ -98,7 +100,6 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME=madara-orchestrator-test-bucket

MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME=orchestrator


#### SERVER ####

MADARA_ORCHESTRATOR_HOST=127.0.0.1
Expand Down
55 changes: 55 additions & 0 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Instant;

use async_std::stream::StreamExt;
use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
Expand All @@ -10,13 +12,15 @@ use mongodb::options::{
UpdateOptions,
};
use mongodb::{bson, Client, Collection};
use opentelemetry::KeyValue;
use url::Url;
use utils::ToDocument;
use uuid::Uuid;

use crate::database::Database;
use crate::jobs::types::{JobItem, JobItemUpdates, JobStatus, JobType};
use crate::jobs::JobError;
use crate::metrics::ORCHESTRATOR_METRICS;

mod utils;

Expand Down Expand Up @@ -67,6 +71,7 @@ impl MongoDb {
impl Database for MongoDb {
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn create_job(&self, job: JobItem) -> Result<JobItem, JobError> {
let start = Instant::now();
let options = UpdateOptions::builder().upsert(true).build();

let updates = job.to_document().map_err(|e| JobError::Other(e.into()))?;
Expand All @@ -93,6 +98,12 @@ impl Database for MongoDb {
.map_err(|e| JobError::Other(e.to_string().into()))?;

if result.matched_count == 0 {
let attributes = [
KeyValue::new("db_operation_name", "create_job"),
KeyValue::new("db_operation_job", format!("{:?}", job)),
];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(job)
} else {
Err(JobError::JobAlreadyExists { internal_id: job.internal_id, job_type: job.job_type })
Expand All @@ -101,25 +112,40 @@ impl Database for MongoDb {

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>> {
let start = Instant::now();
let filter = doc! {
"id": id
};
tracing::debug!(job_id = %id, category = "db_call", "Fetched job by ID");
let attributes = [
KeyValue::new("db_operation_name", "get_job_by_id"),
KeyValue::new("db_operation_id", format!("{:?}", id)),
];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>> {
let start = Instant::now();
let filter = doc! {
"internal_id": internal_id,
"job_type": mongodb::bson::to_bson(&job_type)?,
};
tracing::debug!(internal_id = %internal_id, job_type = ?job_type, category = "db_call", "Fetched job by internal ID and type");
let attributes = [
KeyValue::new("db_operation_name", "get_job_by_internal_id_and_type"),
KeyValue::new("db_operation_id", format!("{:?}", internal_id)),
];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<JobItem> {
let start = Instant::now();
// Filters to search for the job
let filter = doc! {
"id": current_job.id,
Expand Down Expand Up @@ -154,6 +180,12 @@ impl Database for MongoDb {
match result {
Some(job) => {
tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
let attributes = [
KeyValue::new("db_operation_name", "update_job"),
KeyValue::new("db_operation_id", format!("{:?}", current_job.id)),
];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(job)
}
None => {
Expand All @@ -165,6 +197,7 @@ impl Database for MongoDb {

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let start = Instant::now();
let pipeline = vec![
doc! {
"$match": {
Expand Down Expand Up @@ -199,6 +232,12 @@ impl Database for MongoDb {
match cursor.try_next().await? {
Some(doc) => {
let job: JobItem = mongodb::bson::from_document(doc)?;
let attributes = [
KeyValue::new("db_operation_name", "get_latest_job_by_type"),
KeyValue::new("db_operation_job_type", format!("{:?}", job_type)),
];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
}
None => Ok(None),
Expand Down Expand Up @@ -233,6 +272,7 @@ impl Database for MongoDb {
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>> {
let start = Instant::now();
// Convert enums to Bson strings
let job_a_type_bson = Bson::String(format!("{:?}", job_a_type));
let job_a_status_bson = Bson::String(format!("{:?}", job_a_status));
Expand Down Expand Up @@ -323,6 +363,9 @@ impl Database for MongoDb {
}

tracing::debug!(job_count = vec_jobs.len(), category = "db_call", "Retrieved jobs without successor");
let attributes = [KeyValue::new("db_operation_name", "get_jobs_without_successor")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(vec_jobs)
}

Expand All @@ -332,13 +375,17 @@ impl Database for MongoDb {
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>> {
let start = Instant::now();
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"status": bson::to_bson(&job_status)?
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

tracing::debug!(job_type = ?job_type, job_status = ?job_status, category = "db_call", "Fetched latest job by type and status");
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type_and_status")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

Expand All @@ -349,6 +396,7 @@ impl Database for MongoDb {
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>> {
let start = Instant::now();
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"status": bson::to_bson(&job_status)?,
Expand All @@ -361,11 +409,15 @@ impl Database for MongoDb {
};
let jobs: Vec<JobItem> = self.get_job_collection().find(filter, None).await?.try_collect().await?;
tracing::debug!(job_type = ?job_type, job_status = ?job_status, internal_id = internal_id, category = "db_call", "Fetched jobs after internal ID by job type");
let attributes = [KeyValue::new("db_operation_name", "get_jobs_after_internal_id_by_job_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(jobs)
}

#[tracing::instrument(skip(self, limit), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
let start = Instant::now();
let filter = doc! {
"status": {
// TODO: Check that the conversion leads to valid output!
Expand All @@ -377,6 +429,9 @@ impl Database for MongoDb {

let jobs: Vec<JobItem> = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
tracing::debug!(job_count = jobs.len(), category = "db_call", "Retrieved jobs by statuses");
let attributes = [KeyValue::new("db_operation_name", "get_jobs_by_statuses")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(jobs)
}
}
38 changes: 25 additions & 13 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fmt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Context};
Expand Down Expand Up @@ -151,6 +151,7 @@ pub async fn create_job(
metadata: HashMap<String, String>,
config: Arc<Config>,
) -> Result<(), JobError> {
let start = Instant::now();
tracing::info!(log_type = "starting", category = "general", function_type = "create_job", job_type = ?job_type, block_no = %internal_id, "General create job started for block");

tracing::debug!(
Expand Down Expand Up @@ -180,20 +181,24 @@ pub async fn create_job(
.map_err(|e| JobError::Other(OtherError(e)))?;

let attributes = [
KeyValue::new("job_type", format!("{:?}", job_type)),
KeyValue::new("type", "create_job"),
KeyValue::new("job", format!("{:?}", job_item)),
KeyValue::new("operation_job_type", format!("{:?}", job_type)),
KeyValue::new("operation_type", "create_job"),
KeyValue::new("operation_job", format!("{:?}", job_item)),
];

ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
}

/// Processes the job, increments the process attempt count and updates the status of the job in the
/// DB. It then adds the job to the verification queue.
#[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)]
pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let start = Instant::now();
let job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");
Expand Down Expand Up @@ -292,13 +297,16 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
})?;

let attributes = [
KeyValue::new("job_type", format!("{:?}", job.job_type)),
KeyValue::new("type", "process_job"),
KeyValue::new("job", format!("{:?}", job)),
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "process_job"),
KeyValue::new("operation_job", format!("{:?}", job)),
];

ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
Ok(())
}

Expand All @@ -315,6 +323,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
err
)]
pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let start = Instant::now();
let mut job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job started for block");
Expand Down Expand Up @@ -433,13 +442,16 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
};

let attributes = [
KeyValue::new("job_type", format!("{:?}", job.job_type)),
KeyValue::new("type", "verify_job"),
KeyValue::new("job", format!("{:?}", job)),
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
KeyValue::new("operation_job", format!("{:?}", job)),
];

ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
Ok(())
}

Expand Down
38 changes: 35 additions & 3 deletions crates/orchestrator/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use once_cell;
use once_cell::sync::Lazy;
use opentelemetry::metrics::Gauge;
use opentelemetry::metrics::{Counter, Gauge};
use opentelemetry::{global, KeyValue};
use utils::metrics::lib::{register_gauge_metric_instrument, Metrics};
use utils::metrics::lib::{register_counter_metric_instrument, register_gauge_metric_instrument, Metrics};
use utils::register_metric;

register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics);

pub struct OrchestratorMetrics {
pub block_gauge: Gauge<f64>,
pub successful_jobs: Counter<f64>,
pub failed_jobs: Counter<f64>,
pub jobs_response_time: Gauge<f64>,
pub db_calls_response_time: Gauge<f64>,
}

impl Metrics for OrchestratorMetrics {
Expand All @@ -31,6 +35,34 @@ impl Metrics for OrchestratorMetrics {
"block".to_string(),
);

Self { block_gauge }
let successful_jobs = register_counter_metric_instrument(
&orchestrator_meter,
"successful_jobs".to_string(),
"A counter to show count of successful jobs over time".to_string(),
"jobs".to_string(),
);

let failed_jobs = register_counter_metric_instrument(
&orchestrator_meter,
"failed_jobs".to_string(),
"A counter to show count of failed jobs over time".to_string(),
"jobs".to_string(),
);

let jobs_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"jobs_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
);

let db_calls_response_time = register_gauge_metric_instrument(
&orchestrator_meter,
"db_calls_response_time".to_string(),
"A gauge to show response time of jobs over time".to_string(),
"Time".to_string(),
);

Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time }
}
}
Loading

0 comments on commit 9eb4b55

Please sign in to comment.