From 42e719e405082376269671ef22b2fca936276348 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..61e12bb1a9 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,10 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::io::{self, Write}; +#[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -227,6 +231,14 @@ impl TransactionExecutor { Mutex::new(&mut self.bouncer), )); + // Change the defult way of handling panics to write to stderr. + // this ensure that the panic massge and location will be printed to stderr. + // without adding flags to the cargo command. + let orig_hook = panic::take_hook(); + panic::set_hook(Box::new(move |panic_info| { + let _ = writeln!(io::stderr(), "{}", panic_info); + })); + // No thread pool implementation is needed here since we already have our scheduler. The // initialized threads below will "busy wait" for new tasks using the `run` method until the // chunk execution is completed, and then they will be joined together in a for loop. @@ -236,11 +248,19 @@ impl TransactionExecutor { for _ in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if result.is_err() { + ::std::process::abort(); + } }); } }); + // Restore the original panic hook. + panic::set_hook(orig_hook); + let n_committed_txs = worker_executor.scheduler.get_n_committed_txs(); let mut tx_execution_results = Vec::new(); let mut visited_pcs: HashMap> = HashMap::new(); @@ -270,7 +290,6 @@ impl TransactionExecutor { }) .commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs); self.block_state.replace(block_state_after_commit); - tx_execution_results } }