Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: data letter queue #73

Merged
merged 16 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Basic rust-toolchain support.
- `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing.
- Added basic rust-toolchain support.
- Implement DL queue for handling failed jobs.
- Tests for DA job.

## Changed
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,33 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
/// Logs error if the job status `Completed` is existing on DL queue.
pub async fn handle_job_failure(id: Uuid) -> Result<()> {
let config = config().await;

let mut job = get_job(id).await?.clone();
let mut metadata = job.metadata.clone();

if job.status == JobStatus::Completed {
log::error!("Invalid state exists on DL queue: {}", job.status.to_string());
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we fail silently here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL-queue is supposed to handle actual failed cases.
If JobStatus::Completed job is pushed to DL-queue multiple times by the queuing agent,
we prefer not stopping the orchestrator rather failing silently.

}
// We assume that a Failure status wil only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await?;

Ok(())
}

async fn get_job(id: Uuid) -> Result<JobItem> {
let config = config().await;
let job = config.database().get_job_by_id(id).await?;
Expand Down
18 changes: 17 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand Down Expand Up @@ -99,6 +99,22 @@ pub enum JobStatus {
VerificationTimeout,
/// The job failed processing
VerificationFailed,
/// The job failed completing
Failed,
}

impl fmt::Display for JobStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved
match self {
JobStatus::Created => write!(f, "Created"),
JobStatus::LockedForProcessing => write!(f, "Locked for Processing"),
JobStatus::PendingVerification => write!(f, "Pending Verification"),
JobStatus::Completed => write!(f, "Completed"),
JobStatus::VerificationTimeout => write!(f, "Verification Timeout"),
JobStatus::VerificationFailed => write!(f, "Verification Failed"),
JobStatus::Failed => write!(f, "Failed"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
41 changes: 21 additions & 20 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, verify_job};
use crate::jobs::{handle_job_failure, process_job, verify_job};

pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
// Below is the Data Letter Queue for the the above two jobs.
pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -68,26 +70,25 @@ where
Ok(())
}

pub async fn init_consumers() -> Result<()> {
// TODO: figure out a way to generalize this
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e),
macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
}
});
});
};
}

pub async fn init_consumers() -> Result<()> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ impl TestConfigBuilder {
self.storage.unwrap(),
);

drop_database().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need more explanation here.
Ideally this should have been a separate PR, will keep in mind.

But if your question is any of these :

  1. Why is drop_database implemented in TestConfigBuilder ?
  2. Why are we not using ? and using unwrap() ?

Then :

  1. This is to ensure that each test case has a fresh database to work with so that no overlapping of database arguments exist.
  2. Our assumption is that there is no perk for a test case to return an Error, since it's a checking procedure we are fine a throwing the error there directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea the question was more, why do we drop in the middle of the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an initiation function for all the clients, which runs before each test we can drop the database at any point in this code.


config_force_init(config).await;

server
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn test_database_create_job(#[future] get_config: Guard<Arc<Config>>) -> c
// Test Util Functions
// ==========================================

fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
JobItem {
id: Uuid::new_v4(),
internal_id: internal_id.to_string(),
Expand Down
101 changes: 97 additions & 4 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use rstest::rstest;

use crate::config::config;
use crate::jobs::handle_job_failure;
use crate::jobs::types::JobType;
use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder};

use super::database::build_job_item;

#[cfg(test)]
pub mod da_job;

Expand All @@ -15,18 +24,15 @@ use std::time::Duration;
use mockall::predicate::eq;
use mongodb::bson::doc;
use omniqueue::QueueError;
use rstest::rstest;
use tokio::time::sleep;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus};
use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob};
use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE};
use crate::tests::common::MessagePayloadType;
use crate::tests::config::TestConfigBuilder;

/// Tests `create_job` function when job is not existing in the db.
#[rstest]
Expand Down Expand Up @@ -501,3 +507,90 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i
version: 0,
}
}

#[rstest]
#[case(JobType::SnosRun, JobStatus::Failed)]
#[tokio::test]
async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) {
TestConfigBuilder::new().build().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to a fixture

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in here.

TestConfigBuilder allows for customisation over any external client of our choice,
moving it to a global fixture is not feasible since all test cases have different customisation requirements.

We can make fixture for tests under same scope if they require same customised external clients.

for eg :
All tests under da_job if require same config customisation, can implement a fixture just for themselves.
similarly for other scopes .

We can create a separate issue for this and resolve there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job, with already available "last_job_status"
let mut job_expected = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), JobStatus::PendingVerification.to_string());
job_expected.metadata = job_metadata.clone();

let job_id = job_expected.id;

// feeding the job to DB
database_client.create_job(job_expected.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("handle_job_failure failed to run");

let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

assert_eq!(job_fetched, job_expected);
}

#[rstest]
#[case::pending_verification(JobType::SnosRun, JobStatus::PendingVerification)]
#[case::verification_timeout(JobType::SnosRun, JobStatus::VerificationTimeout)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are the other statuses not covered ?

Copy link
Contributor Author

@heemankv heemankv Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initially covered all the statuses as shown here, but it felt redundant to test all, hence they were removed.

What do you suggest @EvolveArt ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well even if it feels redundant I think it's important to have them, you want to avoid having jobs at an unexpected state in the DLQ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid, Implemented

#[tokio::test]
async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) {
TestConfigBuilder::new().build().await;
let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job
let job = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job.id;

// feeding the job to DB
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("handle_job_failure failed to run");

let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

// creating expected output
let mut job_expected = job.clone();
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), job_status.to_string());
job_expected.metadata = job_metadata.clone();
job_expected.status = JobStatus::Failed;

assert_eq!(job_fetched, job_expected);
}

#[rstest]
#[case(JobType::DataSubmission)]
#[tokio::test]
async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType) {
let job_status = JobStatus::Completed;

TestConfigBuilder::new().build().await;
let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job
let job_expected = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job_expected.id;

// feeding the job to DB
database_client.create_job(job_expected.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed.");

// The completed job status on db is untouched.
let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

assert_eq!(job_fetched, job_expected);
}
Loading