diff --git a/CHANGELOG.md b/CHANGELOG.md index 302f7cf1..75c5d004 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Basic rust-toolchain support. - `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing. - Added basic rust-toolchain support. +- Implement DL queue for handling failed jobs. - Added tests for state update job. - Tests for DA job. diff --git a/Cargo.lock b/Cargo.lock index 6dc64ecd..bd538040 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6358,6 +6358,7 @@ dependencies = [ "starknet", "starknet-core 0.9.0", "starknet-settlement-client", + "strum_macros 0.26.4", "thiserror", "tokio", "tracing", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index b6566e8a..e5121529 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -54,6 +54,7 @@ snos = { workspace = true } starknet = { workspace = true } starknet-core = "0.9.0" starknet-settlement-client = { workspace = true } +strum_macros = "0.26.4" thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } tracing = { workspace = true } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 3a90d2fa..ee78393f 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -186,6 +186,33 @@ pub async fn verify_job(id: Uuid) -> Result<()> { Ok(()) } +/// Terminates the job and updates the status of the job in the DB. +/// Logs error if the job status `Completed` is existing on DL queue. +pub async fn handle_job_failure(id: Uuid) -> Result<()> { + let config = config().await; + + let mut job = get_job(id).await?.clone(); + let mut metadata = job.metadata.clone(); + + if job.status == JobStatus::Completed { + log::error!("Invalid state exists on DL queue: {}", job.status.to_string()); + return Ok(()); + } + // We assume that a Failure status wil only show up if the message is sent twice from a queue + // Can return silently because it's already been processed. + else if job.status == JobStatus::Failed { + return Ok(()); + } + + metadata.insert("last_job_status".to_string(), job.status.to_string()); + job.metadata = metadata; + job.status = JobStatus::Failed; + + config.database().update_job(&job).await?; + + Ok(()) +} + async fn get_job(id: Uuid) -> Result { let config = config().await; let job = config.database().get_job_by_id(id).await?; diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index 6d63587a..bb2130b5 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -84,22 +84,35 @@ pub enum JobType { StateTransition, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, strum_macros::Display)] pub enum JobStatus { /// An acknowledgement that the job has been received by the /// orchestrator and is waiting to be processed + + #[strum(to_string = "Created")] Created, /// Some system has taken a lock over the job for processing and no /// other system to process the job + + #[strum(to_string = "Locked for Processing")] LockedForProcessing, /// The job has been processed and is pending verification + + #[strum(to_string = "Pending Verification")] PendingVerification, /// The job has been processed and verified. No other actions needs to be taken + + #[strum(to_string = "Completed")] Completed, /// The job was processed but the was unable to be verified under the given time + #[strum(to_string = "Verification Timeout")] VerificationTimeout, /// The job failed processing + #[strum(to_string = "Verification Failed")] VerificationFailed, + /// The job failed completing + #[strum(to_string = "Failed")] + Failed, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index aebba4bf..01cd2f87 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -10,10 +10,12 @@ use tracing::log; use uuid::Uuid; use crate::config::config; -use crate::jobs::{process_job, verify_job}; +use crate::jobs::{handle_job_failure, process_job, verify_job}; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; +// Below is the Data Letter Queue for the the above two jobs. +pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue"; #[derive(Debug, Serialize, Deserialize)] pub struct JobQueueMessage { @@ -68,26 +70,25 @@ where Ok(()) } -pub async fn init_consumers() -> Result<()> { - // TODO: figure out a way to generalize this - tokio::spawn(async move { - loop { - match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await { - Ok(_) => {} - Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e), - } - sleep(Duration::from_secs(1)).await; - } - }); - tokio::spawn(async move { - loop { - match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await { - Ok(_) => {} - Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e), +macro_rules! spawn_consumer { + ($queue_type :expr, $handler : expr) => { + tokio::spawn(async move { + loop { + match consume_job_from_queue($queue_type, $handler).await { + Ok(_) => {} + Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e), + } + sleep(Duration::from_secs(1)).await; } - sleep(Duration::from_secs(1)).await; - } - }); + }); + }; +} + +pub async fn init_consumers() -> Result<()> { + spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job); + spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure); + Ok(()) } diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 6f80de0a..ebe60256 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -132,6 +132,8 @@ impl TestConfigBuilder { self.storage.unwrap(), ); + drop_database().await.unwrap(); + config_force_init(config).await; server diff --git a/crates/orchestrator/src/tests/database/mod.rs b/crates/orchestrator/src/tests/database/mod.rs index 42ae1a0e..e9602ddd 100644 --- a/crates/orchestrator/src/tests/database/mod.rs +++ b/crates/orchestrator/src/tests/database/mod.rs @@ -54,7 +54,7 @@ async fn test_database_create_job(#[future] get_config: Guard>) -> c // Test Util Functions // ========================================== -fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem { +pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem { JobItem { id: Uuid::new_v4(), internal_id: internal_id.to_string(), diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index b40326aa..295718df 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -1,3 +1,12 @@ +use rstest::rstest; + +use crate::config::config; +use crate::jobs::handle_job_failure; +use crate::jobs::types::JobType; +use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder}; + +use super::database::build_job_item; + #[cfg(test)] pub mod da_job; @@ -15,18 +24,15 @@ use std::time::Duration; use mockall::predicate::eq; use mongodb::bson::doc; use omniqueue::QueueError; -use rstest::rstest; use tokio::time::sleep; use uuid::Uuid; -use crate::config::config; use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; use crate::jobs::job_handler_factory::mock_factory; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus}; use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob}; use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; use crate::tests::common::MessagePayloadType; -use crate::tests::config::TestConfigBuilder; /// Tests `create_job` function when job is not existing in the db. #[rstest] @@ -501,3 +507,96 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i version: 0, } } + +#[rstest] +#[case(JobType::DataSubmission, JobStatus::Completed)] // code should panic here, how can completed move to dl queue ? +#[case(JobType::SnosRun, JobStatus::PendingVerification)] +#[case(JobType::ProofCreation, JobStatus::LockedForProcessing)] +#[case(JobType::ProofRegistration, JobStatus::Created)] +#[case(JobType::StateTransition, JobStatus::Completed)] +#[case(JobType::ProofCreation, JobStatus::VerificationTimeout)] +#[case(JobType::DataSubmission, JobStatus::VerificationFailed)] +#[tokio::test] +async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { + TestConfigBuilder::new().build().await; + let config = config().await; + let database_client = config.database(); + let internal_id = 1; + + // create a job, with already available "last_job_status" + let mut job_expected = build_job_item(job_type.clone(), JobStatus::Failed, internal_id); + let mut job_metadata = job_expected.metadata.clone(); + job_metadata.insert("last_job_status".to_string(), job_status.to_string()); + job_expected.metadata = job_metadata.clone(); + + let job_id = job_expected.id; + + // feeding the job to DB + database_client.create_job(job_expected.clone()).await.unwrap(); + + // calling handle_job_failure + handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + + let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + + assert_eq!(job_fetched, job_expected); +} + +#[rstest] +#[case::pending_verification(JobType::SnosRun, JobStatus::PendingVerification)] +#[case::verification_timeout(JobType::SnosRun, JobStatus::VerificationTimeout)] +#[tokio::test] +async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { + TestConfigBuilder::new().build().await; + let config = config().await; + let database_client = config.database(); + let internal_id = 1; + + // create a job + let job = build_job_item(job_type.clone(), job_status.clone(), internal_id); + let job_id = job.id; + + // feeding the job to DB + database_client.create_job(job.clone()).await.unwrap(); + + // calling handle_job_failure + handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + + let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + + // creating expected output + let mut job_expected = job.clone(); + let mut job_metadata = job_expected.metadata.clone(); + job_metadata.insert("last_job_status".to_string(), job_status.to_string()); + job_expected.metadata = job_metadata.clone(); + job_expected.status = JobStatus::Failed; + + assert_eq!(job_fetched, job_expected); +} + +#[rstest] +#[case(JobType::DataSubmission)] +#[tokio::test] +async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType) { + let job_status = JobStatus::Completed; + + TestConfigBuilder::new().build().await; + let config = config().await; + let database_client = config.database(); + let internal_id = 1; + + // create a job + let job_expected = build_job_item(job_type.clone(), job_status.clone(), internal_id); + let job_id = job_expected.id; + + // feeding the job to DB + database_client.create_job(job_expected.clone()).await.unwrap(); + + // calling handle_job_failure + handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed."); + + // The completed job status on db is untouched. + let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + + assert_eq!(job_fetched, job_expected); +}