Skip to content

Commit

Permalink
update: spwan consumer to a macro_rule
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Jul 24, 2024
1 parent 7a4a4aa commit aff46c8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 34 deletions.
11 changes: 7 additions & 4 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,16 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
}

/// Terminates the job and updates the status of the job in the DB.
pub async fn terminate_job(id: Uuid) -> Result<()> {
pub async fn handle_job_failure(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;

let mut job = get_job(id).await?.clone();
let mut metadata = job.metadata.clone();
metadata.insert("last_job_status".to_string(), job.status.to_string());
config.database().update_metadata(&job, metadata).await?;
config.database().update_job_status(&job, JobStatus::Failed).await?;
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await?;

Ok(())
}
Expand Down
51 changes: 21 additions & 30 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, terminate_job, verify_job};
use crate::jobs::{handle_job_failure, process_job, verify_job};

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
const JOB_TERMINATION_QUEUE: &str = "madara_orchestrator_job_termination_queue";
// Below is the Data Letter Queue for the the above two jobs.
const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -69,35 +70,25 @@ where
Ok(())
}

pub async fn init_consumers() -> Result<()> {
// TODO: figure out a way to generalize this
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_TERMINATION_QUEUE.to_string(), terminate_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_TERMINATION_QUEUE, e),
macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
}
});
});
};
}

pub async fn init_consumers() -> Result<()> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);

Ok(())
}

Expand Down

0 comments on commit aff46c8

Please sign in to comment.