Skip to content

Commit e5a0069

Browse files
committed
Better handling of panics in cycles
1 parent 7bb93a9 commit e5a0069

File tree

3 files changed

+32
-12
lines changed

3 files changed

+32
-12
lines changed

src/cycle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ impl CycleHeads {
342342
database_key_index: DatabaseKeyIndex,
343343
iteration_count: IterationCount,
344344
) -> bool {
345+
assert!(!iteration_count.is_panicked());
345346
if let Some(existing) = self
346347
.0
347348
.iter_mut()

src/function/execute.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ use crate::function::{ClaimGuard, Configuration, IngredientImpl};
88
use crate::ingredient::WaitForResult;
99
use crate::plumbing::ZalsaLocal;
1010
use crate::sync::atomic::{AtomicBool, Ordering};
11-
use crate::tracing;
11+
use crate::sync::thread;
1212
use crate::tracked_struct::Identity;
1313
use crate::zalsa::{MemoIngredientIndex, Zalsa};
1414
use crate::zalsa_local::{ActiveQueryGuard, QueryRevisions};
15+
use crate::{tracing, Cancelled};
1516
use crate::{DatabaseKeyIndex, Event, EventKind, Id};
1617

1718
impl<C> IngredientImpl<C>
@@ -144,6 +145,8 @@ where
144145
zalsa_local: &'db ZalsaLocal,
145146
memo_ingredient_index: MemoIngredientIndex,
146147
) -> (C::Output<'db>, CompletedQuery) {
148+
claim_guard.set_release_mode(ReleaseMode::Default);
149+
147150
let database_key_index = claim_guard.database_key_index();
148151
let zalsa = claim_guard.zalsa();
149152

@@ -155,23 +158,33 @@ where
155158
// TODO: Can we seed those somehow?
156159
let mut last_stale_tracked_ids: Vec<(Identity, Id)> = Vec::new();
157160

158-
let _guard = ClearCycleHeadIfPanicking::new(self, zalsa, id, memo_ingredient_index);
159161
let mut iteration_count = IterationCount::initial();
160162

161163
if let Some(old_memo) = opt_old_memo {
162-
let memo_iteration_count = old_memo.revisions.iteration();
163-
164164
if old_memo.verified_at.load() == zalsa.current_revision()
165165
&& old_memo.cycle_heads().contains(&database_key_index)
166-
&& !memo_iteration_count.is_panicked()
167166
{
167+
let memo_iteration_count = old_memo.revisions.iteration();
168+
169+
// The `DependencyGraph` locking propagates panics when another thread is blocked on a panicking query.
170+
// However, the locking doesn't handle the case where a thread fetches the result of a panicking cycle head query **after** all locks were released.
171+
// That's what we do here. We could consider re-executing the entire cycle but:
172+
// a) It's tricky to ensure that all queries participating in the cycle will re-execute
173+
// (we can't rely on `iteration_count` being updated for nested cycles because the nested cycles may have completed successfully).
174+
// b) It's guaranteed that this query will panic again anyway.
175+
// That's why we simply propagate the panic here. It simplifies our lives and it also avoids duplicate panic messages.
176+
if memo_iteration_count.is_panicked() {
177+
::tracing::warn!("Propagating panic for cycle head that panicked in an earlier execution in that revision");
178+
Cancelled::PropagatedPanic.throw();
179+
}
168180
last_provisional_memo = Some(old_memo);
169181
iteration_count = memo_iteration_count;
170182
}
171183
}
172184

185+
let _poison_guard =
186+
PoisonProvisionalIfPanicking::new(self, zalsa, id, memo_ingredient_index);
173187
let mut active_query = zalsa_local.push_query(database_key_index, iteration_count);
174-
claim_guard.set_release_mode(ReleaseMode::Default);
175188

176189
let (new_value, completed_query) = loop {
177190
// Tracked struct ids that existed in the previous revision
@@ -496,14 +509,14 @@ where
496509
/// a new fix point initial value if that happens.
497510
///
498511
/// We could insert a fixpoint initial value here, but it seems unnecessary.
499-
struct ClearCycleHeadIfPanicking<'a, C: Configuration> {
512+
struct PoisonProvisionalIfPanicking<'a, C: Configuration> {
500513
ingredient: &'a IngredientImpl<C>,
501514
zalsa: &'a Zalsa,
502515
id: Id,
503516
memo_ingredient_index: MemoIngredientIndex,
504517
}
505518

506-
impl<'a, C: Configuration> ClearCycleHeadIfPanicking<'a, C> {
519+
impl<'a, C: Configuration> PoisonProvisionalIfPanicking<'a, C> {
507520
fn new(
508521
ingredient: &'a IngredientImpl<C>,
509522
zalsa: &'a Zalsa,
@@ -519,9 +532,9 @@ impl<'a, C: Configuration> ClearCycleHeadIfPanicking<'a, C> {
519532
}
520533
}
521534

522-
impl<C: Configuration> Drop for ClearCycleHeadIfPanicking<'_, C> {
535+
impl<C: Configuration> Drop for PoisonProvisionalIfPanicking<'_, C> {
523536
fn drop(&mut self) {
524-
if std::thread::panicking() {
537+
if thread::panicking() {
525538
let revisions = QueryRevisions::fixpoint_initial(
526539
self.ingredient.database_key_index(self.id),
527540
IterationCount::panicked(),

tests/parallel/cycle_nested_deep_panic.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ fn initial(_db: &dyn KnobsDatabase) -> CycleValue {
6363
MIN
6464
}
6565

66-
#[test_log::test]
67-
fn the_test() {
66+
fn run() {
6867
tracing::debug!("Starting new run");
6968
let db_t1 = Knobs::default();
7069
let db_t2 = db_t1.clone();
@@ -106,6 +105,13 @@ fn the_test() {
106105
assert_is_set_cycle_error(catch_unwind(|| query_d(&db_t4)));
107106
}
108107

108+
#[test_log::test]
109+
fn the_test() {
110+
for _ in 0..200 {
111+
run()
112+
}
113+
}
114+
109115
#[track_caller]
110116
fn assert_is_set_cycle_error<T>(result: Result<T, Box<dyn std::any::Any + Send>>)
111117
where

0 commit comments

Comments
 (0)