Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: added termination queue #56

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
- Fixtures for testing.
- Implement DL queue for handling failed jobs.
- Added basic rust-toolchain support.

## Changed
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
/// Throws error if the job status `Completed` is existing on DL queue.
pub async fn handle_job_failure(id: Uuid) -> Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let config = config().await;

let mut job = get_job(id).await?.clone();
let mut metadata = job.metadata.clone();

if job.status == JobStatus::Completed {
log::error!("Invalid state exists on DL queue: {}", job.status.to_string());
return Ok(());
}
// We assume that a Failure status wil only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await?;

Ok(())
}

fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
Expand Down
18 changes: 17 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand Down Expand Up @@ -99,6 +99,22 @@ pub enum JobStatus {
VerificationTimeout,
/// The job failed processing
VerificationFailed,
/// The job failed completing
Failed,
}

impl fmt::Display for JobStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
JobStatus::Created => write!(f, "Created"),
JobStatus::LockedForProcessing => write!(f, "Locked for Processing"),
JobStatus::PendingVerification => write!(f, "Pending Verification"),
JobStatus::Completed => write!(f, "Completed"),
JobStatus::VerificationTimeout => write!(f, "Verification Timeout"),
JobStatus::VerificationFailed => write!(f, "Verification Failed"),
JobStatus::Failed => write!(f, "Failed"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
41 changes: 21 additions & 20 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, verify_job};
use crate::jobs::{handle_job_failure, process_job, verify_job};

pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
// Below is the Data Letter Queue for the the above two jobs.
pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -68,26 +70,25 @@ where
Ok(())
}

pub async fn init_consumers() -> Result<()> {
// TODO: figure out a way to generalize this
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e),
macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
}
});
});
};
}

pub async fn init_consumers() -> Result<()> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ impl TestConfigBuilder {
self.storage.unwrap(),
);

drop_database().await.unwrap();

config_force_init(config).await;

server
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn test_database_create_job(#[future] get_config: Guard<Arc<Config>>) -> c
// Test Util Functions
// ==========================================

fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
JobItem {
id: Uuid::new_v4(),
internal_id: internal_id.to_string(),
Expand Down
120 changes: 117 additions & 3 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use super::database::build_job_item;
use crate::config::config;
use crate::jobs::handle_job_failure;
use crate::jobs::types::JobType;
use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder};
use rstest::rstest;

use std::str::FromStr;
#[cfg(test)]
pub mod da_job;

Expand All @@ -12,11 +17,120 @@ pub mod state_update_job;
#[rstest]
#[tokio::test]
async fn create_job_fails_job_already_exists() {
// TODO
todo!()
}

#[rstest]
#[tokio::test]
async fn create_job_fails_works_new_job() {
// TODO
todo!()
}

impl FromStr for JobStatus {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Created" => Ok(JobStatus::Created),
"LockedForProcessing" => Ok(JobStatus::LockedForProcessing),
"PendingVerification" => Ok(JobStatus::PendingVerification),
"Completed" => Ok(JobStatus::Completed),
"VerificationTimeout" => Ok(JobStatus::VerificationTimeout),
"VerificationFailed" => Ok(JobStatus::VerificationFailed),
"Failed" => Ok(JobStatus::Failed),
_ => Err(format!("Invalid job status: {}", s)),
}
}
}

impl FromStr for JobType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"SnosRun" => Ok(JobType::SnosRun),
"DataSubmission" => Ok(JobType::DataSubmission),
"ProofCreation" => Ok(JobType::ProofCreation),
"ProofRegistration" => Ok(JobType::ProofRegistration),
"StateTransition" => Ok(JobType::StateTransition),
_ => Err(format!("Invalid job type: {}", s)),
}
}
}

#[rstest]
#[case("SnosRun", "PendingVerification")]
#[case("DataSubmission", "Failed")]
#[tokio::test]
async fn handle_job_failure_job_status_typical_works(#[case] job_type: JobType, #[case] job_status: JobStatus) {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
TestConfigBuilder::new().build().await;
let internal_id = 1;

let config = config().await;
let database_client = config.database();

// create a job
let mut job = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job.id;

// if test case is for Failure, add last_job_status to job's metadata
if job_status == JobStatus::Failed {
let mut metadata = job.metadata.clone();
metadata.insert("last_job_status".to_string(), "VerificationTimeout".to_string());
job.metadata = metadata;
}

// feeding the job to DB
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
let response = handle_job_failure(job_id).await;

match response {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => {
// check job in db
let job = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data");

if let Some(job_item) = job {
// check if job status is Failure
assert_eq!(job_item.status, JobStatus::Failed);
// check if job metadata has `last_job_status`
assert!(job_item.metadata.get("last_job_status").is_some());
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
println!("Handle Job Failure for ID {} was handled successfully", job_id);
}
}
Err(err) => {
panic!("Test case should have passed: {} ", err);
}
}
}

#[rstest]
// code should panic here, how can completed move to dl queue ?
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
#[case("DataSubmission")]
#[tokio::test]
async fn handle_job_failure__job_status_completed_works(#[case] job_type: JobType) {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let job_status = JobStatus::Completed;
TestConfigBuilder::new().build().await;
let internal_id = 1;

let config = config().await;
let database_client = config.database();

// create a job
let job = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job.id;

// feeding the job to DB
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed.");

// The completed job status on db is untouched.
let job_fetched_result = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data");

if let Some(job_fetched) = job_fetched_result {
assert_eq!(job_fetched.status, JobStatus::Completed);
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
}
}