Skip to content

Commit

Permalink
merged: main
Browse files Browse the repository at this point in the history
  • Loading branch information
mohiiit committed Nov 9, 2024
2 parents 0df375d + c66c0f3 commit 8fbedac
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## Added

- tests: http_client tests added
- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
- Endpoints for triggering processing and verification jobs
Expand Down
23 changes: 20 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -8,6 +9,7 @@ use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use conversion::parse_string;
use da_job::DaError;
use futures::FutureExt;
use mockall::automock;
use mockall_double::double;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -227,18 +229,33 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::debug!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = match job_handler.process_job(config.clone(), &mut job).await {
Ok(external_id) => {
let external_id = match AssertUnwindSafe(job_handler.process_job(config.clone(), &mut job)).catch_unwind().await {
Ok(Ok(external_id)) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
external_id
}
Err(e) => {
Ok(Err(e)) => {
// TODO: I think most of the times the errors will not be fixed automatically
// if we just retry. But for some failures like DB issues, it might be possible
// that retrying will work. So we can add a retry logic here to improve robustness.
tracing::error!(job_id = ?id, error = ?e, "Failed to process job");
return move_job_to_failed(&job, config.clone(), format!("Processing failed: {}", e)).await;
}
Err(panic) => {
let panic_msg = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");

tracing::error!(job_id = ?id, panic_msg = %panic_msg, "Job handler panicked during processing");
return move_job_to_failed(
&job,
config.clone(),
format!("Job handler panicked with message: {}", panic_msg),
)
.await;
}
};
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
47 changes: 47 additions & 0 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,53 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works
assert_eq!(consumed_message_payload.id, job_item.id);
}

/// Tests `process_job` function when job handler panics during execution.
/// This test verifies that:
/// 1. The panic is properly caught and handled
/// 2. The job is moved to failed state
/// 3. Appropriate error message is set in the job metadata
#[rstest]
#[tokio::test]
async fn process_job_handles_panic() {
let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string());

// Building config
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;

let database_client = services.config.database();
// Creating job in database
database_client.create_job(job_item.clone()).await.unwrap();

let mut job_handler = MockJob::new();
// Setting up mock to panic when process_job is called
job_handler
.expect_process_job()
.times(1)
.returning(|_, _| -> Result<String, JobError> { panic!("Simulated panic in process_job") });

// Mocking the `get_job_handler` call in process_job function
let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler));

assert!(process_job(job_item.id, services.config.clone()).await.is_ok());

// DB checks - verify the job was moved to failed state
let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(job_in_db.status, JobStatus::Failed);
assert!(
job_in_db
.metadata
.get(JOB_METADATA_FAILURE_REASON)
.unwrap()
.contains("Job handler panicked with message: Simulated panic in process_job")
);
}

/// Tests `process_job` function when job is already existing in the db and job status is not
/// `Created` or `VerificationFailed`.
#[rstest]
Expand Down

0 comments on commit 8fbedac

Please sign in to comment.