Skip to content

Commit

Permalink
Add panic recovery handling for process_job with test coverage (#178)
Browse files Browse the repository at this point in the history
* refactor: makefile updated as per latest bootstraper commit

* feat: handling panic in process job gracefully

* changelog: updated

* refactor: removed extra files

* lint: fixed

* refactor: panicking in the process job, test updated accordingly

* refactor: process job updated to update panic message in the db

* refactor: removed job id from panic message, was redundant

---------

Co-authored-by: mohiiit <[email protected]>
Co-authored-by: Heemank Verma <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2024
1 parent 803be5f commit c66c0f3
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
- Endpoints for triggering processing and verification jobs
Expand Down
23 changes: 20 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -8,6 +9,7 @@ use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use conversion::parse_string;
use da_job::DaError;
use futures::FutureExt;
use mockall::automock;
use mockall_double::double;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -227,18 +229,33 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::debug!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = match job_handler.process_job(config.clone(), &mut job).await {
Ok(external_id) => {
let external_id = match AssertUnwindSafe(job_handler.process_job(config.clone(), &mut job)).catch_unwind().await {
Ok(Ok(external_id)) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
external_id
}
Err(e) => {
Ok(Err(e)) => {
// TODO: I think most of the times the errors will not be fixed automatically
// if we just retry. But for some failures like DB issues, it might be possible
// that retrying will work. So we can add a retry logic here to improve robustness.
tracing::error!(job_id = ?id, error = ?e, "Failed to process job");
return move_job_to_failed(&job, config.clone(), format!("Processing failed: {}", e)).await;
}
Err(panic) => {
let panic_msg = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");

tracing::error!(job_id = ?id, panic_msg = %panic_msg, "Job handler panicked during processing");
return move_job_to_failed(
&job,
config.clone(),
format!("Job handler panicked with message: {}", panic_msg),
)
.await;
}
};
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
47 changes: 47 additions & 0 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,53 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works
assert_eq!(consumed_message_payload.id, job_item.id);
}

/// Tests `process_job` function when job handler panics during execution.
/// This test verifies that:
/// 1. The panic is properly caught and handled
/// 2. The job is moved to failed state
/// 3. Appropriate error message is set in the job metadata
#[rstest]
#[tokio::test]
async fn process_job_handles_panic() {
let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string());

// Building config
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;

let database_client = services.config.database();
// Creating job in database
database_client.create_job(job_item.clone()).await.unwrap();

let mut job_handler = MockJob::new();
// Setting up mock to panic when process_job is called
job_handler
.expect_process_job()
.times(1)
.returning(|_, _| -> Result<String, JobError> { panic!("Simulated panic in process_job") });

// Mocking the `get_job_handler` call in process_job function
let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler));

assert!(process_job(job_item.id, services.config.clone()).await.is_ok());

// DB checks - verify the job was moved to failed state
let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(job_in_db.status, JobStatus::Failed);
assert!(
job_in_db
.metadata
.get(JOB_METADATA_FAILURE_REASON)
.unwrap()
.contains("Job handler panicked with message: Simulated panic in process_job")
);
}

/// Tests `process_job` function when job is already existing in the db and job status is not
/// `Created` or `VerificationFailed`.
#[rstest]
Expand Down
2 changes: 1 addition & 1 deletion madara
2 changes: 1 addition & 1 deletion pathfinder
Submodule pathfinder updated 42 files
+16 −26 crates/common/src/error.rs
+0 −6 crates/common/src/receipt.rs
+2 −164 crates/common/src/state_update.rs
+14 −18 crates/executor/src/types.rs
+0 −2 crates/gateway-types/src/reply.rs
+0 −14 crates/make-stream/src/lib.rs
+4 −27 crates/merkle-tree/src/class.rs
+5 −53 crates/merkle-tree/src/contract.rs
+8 −9 crates/merkle-tree/src/contract_state.rs
+120 −207 crates/merkle-tree/src/tree.rs
+1 −1 crates/p2p/src/builder.rs
+42 −48 crates/p2p/src/client/conv.rs
+13 −52 crates/p2p/src/client/peer_agnostic.rs
+2 −12 crates/p2p/src/client/peer_agnostic/fixtures.rs
+0 −1 crates/p2p_proto/proto/class.proto
+0 −1 crates/p2p_proto/proto/transaction.proto
+5 −26 crates/p2p_proto/src/class.rs
+20 −25 crates/p2p_proto/src/transaction.rs
+2 −2 crates/pathfinder/examples/re_execute.rs
+1 −7 crates/pathfinder/src/p2p_network/sync_handlers.rs
+3 −3 crates/pathfinder/src/p2p_network/sync_handlers/tests.rs
+1 −0 crates/pathfinder/src/state.rs
+18 −7 crates/pathfinder/src/state/sync.rs
+4 −4 crates/pathfinder/src/sync.rs
+12 −7 crates/pathfinder/src/sync/checkpoint.rs
+0 −27 crates/pathfinder/src/sync/checkpoint/fixture.rs
+0 −4 crates/pathfinder/src/sync/error.rs
+8 −209 crates/pathfinder/src/sync/state_updates.rs
+6 −2 crates/pathfinder/src/sync/track.rs
+6 −23 crates/rpc/src/dto/fee.rs
+5 −11 crates/rpc/src/dto/receipt.rs
+5 −4 crates/rpc/src/lib.rs
+80 −120 crates/rpc/src/method/estimate_fee.rs
+8 −12 crates/rpc/src/method/estimate_message_fee.rs
+92 −76 crates/rpc/src/method/get_storage_proof.rs
+17 −33 crates/rpc/src/pathfinder/methods/get_proof.rs
+31 −46 crates/rpc/src/v06/method/estimate_fee.rs
+8 −12 crates/rpc/src/v06/method/estimate_message_fee.rs
+4 −4 crates/rpc/src/v06/method/simulate_transactions.rs
+0 −1 crates/rpc/src/v08.rs
+0 −1 crates/storage/src/connection/transaction.rs
+0 −1 crates/storage/src/test_utils.rs

0 comments on commit c66c0f3

Please sign in to comment.