Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

fix(concurrency): stopping the program if one of the threads panics #2089

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/blockifier/src/blockifier/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(feature = "concurrency")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "concurrency")]
use std::panic::{self, catch_unwind, AssertUnwindSafe};
#[cfg(feature = "concurrency")]
use std::sync::Arc;
#[cfg(feature = "concurrency")]
use std::sync::Mutex;
Expand Down Expand Up @@ -218,6 +220,8 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
&mut self,
chunk: &[Transaction],
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
use crate::concurrency::utils::AbortIfPanic;

let block_state = self.block_state.take().expect("The block state should be `Some`.");

let worker_executor = Arc::new(WorkerExecutor::initialize(
Expand All @@ -236,7 +240,24 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
for _ in 0..self.config.concurrency_config.n_workers {
let worker_executor = Arc::clone(&worker_executor);
s.spawn(move || {
worker_executor.run();
// Making sure that the program will abort if a panic accured while halting the
// scheduler.
let abort_guard = AbortIfPanic;
// If a panic is not handled or the handling logic itself panics, then we abort
// the program.
if let Err(err) = catch_unwind(AssertUnwindSafe(|| {
worker_executor.run();
})) {
// If the program panics here, the abort guard will exit the program.
// In this case, no panic message will be logged. Add the cargo flag
// --nocapture to log the panic message.

worker_executor.scheduler.halt();
abort_guard.release();
panic::resume_unwind(err);
}

abort_guard.release();
});
}
});
Expand Down
6 changes: 5 additions & 1 deletion crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> TransactionCommitter<'a> {
assert!(*self.commit_index_guard > 0, "Commit index underflow.");
*self.commit_index_guard -= 1;

self.scheduler.done_marker.store(true, Ordering::Release);
self.scheduler.halt();
}
}

Expand Down Expand Up @@ -161,6 +161,10 @@ impl Scheduler {
*self.commit_index.lock().unwrap()
}

pub fn halt(&self) {
self.done_marker.store(true, Ordering::Release);
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
lock_mutex_in_array(&self.tx_statuses, tx_index)
}
Expand Down
17 changes: 17 additions & 0 deletions crates/blockifier/src/concurrency/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,23 @@ use std::sync::{Mutex, MutexGuard};

use crate::concurrency::TxIndex;

// This struct is used to abort the program if a panic occurred in a place where it could not be
// handled.
pub struct AbortIfPanic;

impl Drop for AbortIfPanic {
fn drop(&mut self) {
eprintln!("detected unexpected panic; aborting");
std::process::abort();
}
}

impl AbortIfPanic {
pub fn release(self) {
std::mem::forget(self);
}
}

pub fn lock_mutex_in_array<T: Debug>(array: &[Mutex<T>], tx_index: TxIndex) -> MutexGuard<'_, T> {
array[tx_index].lock().unwrap_or_else(|error| {
panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref())
Expand Down
Loading