Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
fix(concurrency): stopping the program if one of the threads panics
Browse files Browse the repository at this point in the history
  • Loading branch information
meship-starkware committed Jul 17, 2024
1 parent f6fa5af commit 4d21f6d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 3 deletions.
24 changes: 23 additions & 1 deletion crates/blockifier/src/blockifier/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(feature = "concurrency")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "concurrency")]
use std::panic::{self, catch_unwind, AssertUnwindSafe};
#[cfg(feature = "concurrency")]
use std::sync::Arc;
#[cfg(feature = "concurrency")]
use std::sync::Mutex;
Expand Down Expand Up @@ -218,6 +220,8 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
&mut self,
chunk: &[Transaction],
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
use crate::concurrency::utils::AbortGuard;

let block_state = self.block_state.take().expect("The block state should be `Some`.");

let worker_executor = Arc::new(WorkerExecutor::initialize(
Expand All @@ -236,7 +240,25 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
for _ in 0..self.config.concurrency_config.n_workers {
let worker_executor = Arc::clone(&worker_executor);
s.spawn(move || {
worker_executor.run();
// to make sure that if one of the threads panicked all threads would stop,
// and the main thread would panic.
let abort_guard = AbortGuard;
let result = catch_unwind(AssertUnwindSafe(|| {
worker_executor.run();
}));
if let Err(err) = result {
// this make sure that the program will abort if a panic accured
// while halting the scheduler.
eprintln!(
"Worker executor thread panicked. {:?}",
err.downcast_ref::<&str>().unwrap()
);
worker_executor.scheduler.halt();
abort_guard.release();
panic::resume_unwind(err);
}
abort_guard.release();

});
}
});
Expand Down
6 changes: 5 additions & 1 deletion crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> TransactionCommitter<'a> {
assert!(*self.commit_index_guard > 0, "Commit index underflow.");
*self.commit_index_guard -= 1;

self.scheduler.done_marker.store(true, Ordering::Release);
self.scheduler.halt();
}
}

Expand Down Expand Up @@ -161,6 +161,10 @@ impl Scheduler {
*self.commit_index.lock().unwrap()
}

pub fn halt(&self) {
self.done_marker.store(true, Ordering::Release);
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
lock_mutex_in_array(&self.tx_statuses, tx_index)
}
Expand Down
17 changes: 17 additions & 0 deletions crates/blockifier/src/concurrency/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,23 @@ use std::sync::{Mutex, MutexGuard};

use crate::concurrency::TxIndex;

// This stract is used to abort the program if a
// panic ocurred in a place where it cannot be handled.
pub struct AbortGuard;

impl Drop for AbortGuard {
fn drop(&mut self) {
eprintln!("detected unexpected panic; aborting");
::std::process::abort();
}
}

impl AbortGuard {
pub fn release(self) {
std::mem::forget(self);
}
}

pub fn lock_mutex_in_array<T: Debug>(array: &[Mutex<T>], tx_index: TxIndex) -> MutexGuard<'_, T> {
array[tx_index].lock().unwrap_or_else(|error| {
panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref())
Expand Down
2 changes: 1 addition & 1 deletion crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

fn execute(&self, tx_index: TxIndex) {
self.execute_tx(tx_index);
self.scheduler.finish_execution(tx_index)
self.scheduler.finish_execution(tx_index);
}

fn execute_tx(&self, tx_index: TxIndex) {
Expand Down

0 comments on commit 4d21f6d

Please sign in to comment.