diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 5acad76f..ef14fc90 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -174,13 +174,16 @@ pub async fn verify_job(id: Uuid) -> Result<()> { } /// Terminates the job and updates the status of the job in the DB. -pub async fn terminate_job(id: Uuid) -> Result<()> { +pub async fn handle_job_failure(id: Uuid) -> Result<()> { let config = config().await; - let job = get_job(id).await?; + + let mut job = get_job(id).await?.clone(); let mut metadata = job.metadata.clone(); metadata.insert("last_job_status".to_string(), job.status.to_string()); - config.database().update_metadata(&job, metadata).await?; - config.database().update_job_status(&job, JobStatus::Failed).await?; + job.metadata = metadata; + job.status = JobStatus::Failed; + + config.database().update_job(&job).await?; Ok(()) } diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 2ef47dfb..d1c81d01 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -10,11 +10,12 @@ use tracing::log; use uuid::Uuid; use crate::config::config; -use crate::jobs::{process_job, terminate_job, verify_job}; +use crate::jobs::{handle_job_failure, process_job, verify_job}; const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; -const JOB_TERMINATION_QUEUE: &str = "madara_orchestrator_job_termination_queue"; +// Below is the Data Letter Queue for the the above two jobs. +const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue"; #[derive(Debug, Serialize, Deserialize)] pub struct JobQueueMessage { @@ -69,35 +70,25 @@ where Ok(()) } -pub async fn init_consumers() -> Result<()> { - // TODO: figure out a way to generalize this - tokio::spawn(async move { - loop { - match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await { - Ok(_) => {} - Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e), - } - sleep(Duration::from_secs(1)).await; - } - }); - tokio::spawn(async move { - loop { - match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await { - Ok(_) => {} - Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e), - } - sleep(Duration::from_secs(1)).await; - } - }); - tokio::spawn(async move { - loop { - match consume_job_from_queue(JOB_TERMINATION_QUEUE.to_string(), terminate_job).await { - Ok(_) => {} - Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_TERMINATION_QUEUE, e), +macro_rules! spawn_consumer { + ($queue_type :expr, $handler : expr) => { + tokio::spawn(async move { + loop { + match consume_job_from_queue($queue_type, $handler).await { + Ok(_) => {} + Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e), + } + sleep(Duration::from_secs(1)).await; } - sleep(Duration::from_secs(1)).await; - } - }); + }); + }; +} + +pub async fn init_consumers() -> Result<()> { + spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job); + spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure); + Ok(()) }