Skip to content

Commit 1806467

Browse files
committed
Fix hang in maybe_changed_after
1 parent c641fc4 commit 1806467

File tree

5 files changed

+32
-44
lines changed

5 files changed

+32
-44
lines changed

src/function/fetch.rs

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -200,35 +200,6 @@ where
200200
// still valid for the current revision.
201201
return unsafe { Some(self.extend_memo_lifetime(old_memo)) };
202202
}
203-
204-
// If this is a provisional memo from the same revision, await all its cycle heads because
205-
// we need to ensure that only one thread is iterating on a cycle at a given time.
206-
// For example, if we have a nested cycle like so:
207-
// ```
208-
// a -> b -> c -> b
209-
// -> a
210-
//
211-
// d -> b
212-
// ```
213-
// thread 1 calls `a` and `a` completes the inner cycle `b -> c` but hasn't finished the outer cycle `a` yet.
214-
// thread 2 now calls `b`. We don't want that thread 2 iterates `b` while thread 1 is iterating `a` at the same time
215-
// because it can result in thread b overriding provisional memos that thread a has accessed already and still relies upon.
216-
//
217-
// By waiting, we ensure that thread 1 completes a (based on a provisional value for `b`) and `b`
218-
// becomes the new outer cycle, which thread 2 drives to completion.
219-
if old_memo.may_be_provisional()
220-
&& old_memo.verified_at.load() == zalsa.current_revision()
221-
{
222-
// Try to claim all cycle heads of the provisional memo. If we can't because
223-
// some head is running on another thread, drop our claim guard to give that thread
224-
// a chance to take ownership of this query and complete it as part of its fixpoint iteration.
225-
// We will then block on the cycle head and retry once all cycle heads completed.
226-
// if !old_memo.try_claim_heads(zalsa, zalsa_local) {
227-
// drop(claim_guard);
228-
// old_memo.block_on_heads(zalsa, zalsa_local);
229-
// return None;
230-
// }
231-
}
232203
}
233204
}
234205

src/function/maybe_changed_after.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,12 @@ where
8484
loop {
8585
let database_key_index = self.database_key_index(id);
8686

87-
crate::tracing::debug!(
88-
"{database_key_index:?}: maybe_changed_after(revision = {revision:?})"
89-
);
87+
let _span = crate::tracing::info_span!(
88+
"maybe_changed_after",
89+
?revision,
90+
query=?database_key_index
91+
)
92+
.entered();
9093

9194
// Check if we have a verified version: this is the hot path.
9295
let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
@@ -141,7 +144,7 @@ where
141144
) -> Option<VerifyResult> {
142145
let database_key_index = self.database_key_index(key_index);
143146

144-
let _claim_guard = match self.sync_table.try_claim(zalsa, key_index, true) {
147+
let _claim_guard = match self.sync_table.try_claim(zalsa, key_index, false) {
145148
ClaimResult::Claimed(guard) => guard,
146149
ClaimResult::Running(blocked_on) => {
147150
blocked_on.block_on(zalsa);
@@ -175,10 +178,8 @@ where
175178

176179
// If `validate_maybe_provisional` returns `true`, but only because all cycle heads are from the same iteration,
177180
// carry over the cycle heads so that the caller verifies them.
178-
if old_memo.may_be_provisional() {
179-
for head in old_memo.cycle_heads() {
180-
cycle_heads.insert_head(head.database_key_index);
181-
}
181+
for head in old_memo.cycle_heads() {
182+
cycle_heads.insert_head(head.database_key_index);
182183
}
183184

184185
return Some(if old_memo.revisions.changed_at > revision {
@@ -467,7 +468,7 @@ where
467468
match cycle_head {
468469
TryClaimHeadsResult::Cycle {
469470
head_iteration_count,
470-
current_iteration_count,
471+
memo_iteration_count: current_iteration_count,
471472
verified_at: head_verified_at,
472473
} => {
473474
if head_verified_at != verified_at {
@@ -487,7 +488,7 @@ where
487488
true
488489
}
489490

490-
crate::tracing::trace!(
491+
crate::tracing::info!(
491492
"{database_key_index:?}: validate_same_iteration(memo = {memo:#?})",
492493
memo = memo.tracing_debug()
493494
);
@@ -506,6 +507,15 @@ where
506507
return false;
507508
}
508509

510+
// Always return `false` if this is a cycle initial memo (or the last provisional memo in an iteration)
511+
// as this value has obviously not finished computing yet.
512+
if cycle_heads
513+
.iter()
514+
.all(|head| head.database_key_index == database_key_index)
515+
{
516+
return false;
517+
}
518+
509519
validate_same_iteration_cold(zalsa, zalsa_local, cycle_heads, verified_at)
510520
}
511521

src/function/memo.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl<'db, C: Configuration> Memo<'db, C> {
197197
for claim_result in cycle_heads {
198198
match claim_result {
199199
TryClaimHeadsResult::Cycle {
200-
current_iteration_count,
200+
memo_iteration_count: current_iteration_count,
201201
head_iteration_count,
202202
..
203203
} => {
@@ -449,7 +449,7 @@ pub(super) enum TryClaimHeadsResult<'me> {
449449
/// Claiming the cycle head results in a cycle.
450450
Cycle {
451451
head_iteration_count: IterationCount,
452-
current_iteration_count: IterationCount,
452+
memo_iteration_count: IterationCount,
453453
verified_at: Revision,
454454
},
455455

@@ -506,7 +506,7 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> {
506506
);
507507
return Some(TryClaimHeadsResult::Cycle {
508508
head_iteration_count,
509-
current_iteration_count,
509+
memo_iteration_count: current_iteration_count,
510510
verified_at: self.zalsa.current_revision(),
511511
});
512512
}
@@ -540,7 +540,7 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> {
540540
};
541541

542542
Some(TryClaimHeadsResult::Cycle {
543-
current_iteration_count,
543+
memo_iteration_count: current_iteration_count,
544544
head_iteration_count,
545545
verified_at,
546546
})

src/function/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ impl SyncTable {
6262
SyncOwnerId::Transferred => {
6363
let current_id = thread::current().id();
6464
let database_key_index = DatabaseKeyIndex::new(self.ingredient, key_index);
65+
6566
return match zalsa
6667
.runtime()
6768
.claim_transferred(database_key_index, allow_reentry)

src/tracing.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ macro_rules! debug_span {
2525
};
2626
}
2727

28+
macro_rules! info_span {
29+
($($x:tt)*) => {
30+
crate::tracing::span!(INFO, $($x)*)
31+
};
32+
}
33+
2834
macro_rules! event {
2935
($level:ident, $($x:tt)*) => {{
3036
let event = {
@@ -51,4 +57,4 @@ macro_rules! span {
5157
}};
5258
}
5359

54-
pub(crate) use {debug, debug_span, event, info, span, trace};
60+
pub(crate) use {debug, debug_span, event, info, info_span, span, trace};

0 commit comments

Comments
 (0)