From 234757c9797e4f31b9a8beb88f90992f72a37ac3 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 14 +++++++++++--- crates/blockifier/src/concurrency/scheduler.rs | 6 +++++- crates/blockifier/src/concurrency/worker_logic.rs | 5 ++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..94f2f70298 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,8 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -233,10 +235,17 @@ impl TransactionExecutor { // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some // upper level tokio thread pool (Runtime in tokio terminology). std::thread::scope(|s| { - for _ in 0..self.config.concurrency_config.n_workers { + for i in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if let Err(err) = result { + println!("Thread {} caught a panic, propagating it.", i); + worker_executor.scheduler.mark_done(); + panic::resume_unwind(err); + } }); } }); @@ -270,7 +279,6 @@ impl TransactionExecutor { }) .commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs); self.block_state.replace(block_state_after_commit); - tx_execution_results } } diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 172918b365..750ef31309 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -231,10 +231,14 @@ impl Scheduler { } /// Returns the done marker. - fn done(&self) -> bool { + pub(crate) fn done(&self) -> bool { self.done_marker.load(Ordering::Acquire) } + pub fn mark_done(&self) { + self.done_marker.store(true, Ordering::Release); + } + #[cfg(any(feature = "testing", test))] pub fn set_tx_status(&self, tx_index: TxIndex, status: TransactionStatus) { if tx_index < self.chunk_size { diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 60d5b19e71..56071a5766 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -84,6 +84,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { } pub fn run(&self) { + if self.scheduler.done() { + return; + }; let mut task = Task::AskForTask; loop { self.commit_while_possible(); @@ -118,7 +121,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { fn execute(&self, tx_index: TxIndex) { self.execute_tx(tx_index); - self.scheduler.finish_execution(tx_index) + self.scheduler.finish_execution(tx_index); } fn execute_tx(&self, tx_index: TxIndex) {