From 1ad44db802136660f91ea115638e4f8374b0a2c0 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..77ad51da3b 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,10 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::io::{self, Write}; +#[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -227,16 +231,29 @@ impl TransactionExecutor { Mutex::new(&mut self.bouncer), )); + // Change the defult way of handling panics to write to stderr. + // this ensure that the panic massge and location will be printed to stderr. + // without adding flags to the cargo command. + panic::set_hook(Box::new(move |panic_info| { + let _ = writeln!(io::stderr(), "{}", panic_info); + })); + // No thread pool implementation is needed here since we already have our scheduler. The // initialized threads below will "busy wait" for new tasks using the `run` method until the // chunk execution is completed, and then they will be joined together in a for loop. // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some // upper level tokio thread pool (Runtime in tokio terminology). + std::thread::scope(|s| { for _ in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if result.is_err() { + ::std::process::abort(); + } }); } });