From b1b86ebdd7182d454b1429a253eef8e449a4f300 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 15 ++++++++++++--- crates/blockifier/src/concurrency/scheduler.rs | 6 +++++- crates/blockifier/src/concurrency/worker_logic.rs | 5 ++++- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..17da453312 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,8 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -233,10 +235,17 @@ impl TransactionExecutor { // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some // upper level tokio thread pool (Runtime in tokio terminology). std::thread::scope(|s| { - for _ in 0..self.config.concurrency_config.n_workers { + for i in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if let Err(err) = result { + println!("Thread {} caught a panic, propagating it.", i); + worker_executor.scheduler.mark_done(); + panic::resume_unwind(err); + } }); } }); @@ -270,7 +279,7 @@ impl TransactionExecutor { }) .commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs); self.block_state.replace(block_state_after_commit); - + tx_execution_results } } diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 172918b365..750ef31309 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -231,10 +231,14 @@ impl Scheduler { } /// Returns the done marker. - fn done(&self) -> bool { + pub(crate) fn done(&self) -> bool { self.done_marker.load(Ordering::Acquire) } + pub fn mark_done(&self) { + self.done_marker.store(true, Ordering::Release); + } + #[cfg(any(feature = "testing", test))] pub fn set_tx_status(&self, tx_index: TxIndex, status: TransactionStatus) { if tx_index < self.chunk_size { diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 60d5b19e71..7cbd4778d1 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -86,6 +86,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { pub fn run(&self) { let mut task = Task::AskForTask; loop { + if self.scheduler.done() { + return; + }; self.commit_while_possible(); task = match task { Task::ExecutionTask(tx_index) => { @@ -118,7 +121,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { fn execute(&self, tx_index: TxIndex) { self.execute_tx(tx_index); - self.scheduler.finish_execution(tx_index) + self.scheduler.finish_execution(tx_index); } fn execute_tx(&self, tx_index: TxIndex) {