Skip to content

Commit dbb3eb0

Browse files
committed
Only collect cycle heads one level deep
1 parent bdab78a commit dbb3eb0

File tree

5 files changed

+114
-95
lines changed

5 files changed

+114
-95
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ intrusive-collections = "0.9.7"
2222
parking_lot = "0.12"
2323
portable-atomic = "1"
2424
rustc-hash = "2"
25-
smallvec = "1"
25+
smallvec = { version = "1", features = ["const_new"] }
2626
thin-vec = { version = "0.2.14" }
2727
tracing = { version = "0.1", default-features = false, features = ["std"] }
2828

src/cycle.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ pub struct CycleHead {
114114
removed: AtomicBool,
115115
}
116116

117+
impl CycleHead {
118+
pub const fn new(
119+
database_key_index: DatabaseKeyIndex,
120+
iteration_count: IterationCount,
121+
) -> Self {
122+
Self {
123+
database_key_index,
124+
iteration_count: AtomicIterationCount(AtomicU8::new(iteration_count.0)),
125+
removed: AtomicBool::new(false),
126+
}
127+
}
128+
}
129+
117130
impl Clone for CycleHead {
118131
fn clone(&self) -> Self {
119132
Self {
@@ -147,6 +160,10 @@ impl IterationCount {
147160
Self(u8::MAX)
148161
}
149162

163+
pub(crate) const fn is_panicked(self) -> bool {
164+
self.0 == u8::MAX
165+
}
166+
150167
pub(crate) const fn increment(self) -> Option<Self> {
151168
let next = Self(self.0 + 1);
152169
if next.0 <= MAX_ITERATIONS.0 {
@@ -248,6 +265,12 @@ impl CycleHeads {
248265
}
249266
}
250267

268+
/// Iterates over all cycle heads that aren't equal to `own`.
269+
pub(crate) fn iter_not_eq(&self, own: DatabaseKeyIndex) -> impl Iterator<Item = &CycleHead> {
270+
self.iter()
271+
.filter(move |head| head.database_key_index != own)
272+
}
273+
251274
pub(crate) fn contains(&self, value: &DatabaseKeyIndex) -> bool {
252275
self.into_iter()
253276
.any(|head| head.database_key_index == *value && !head.removed.load(Ordering::Relaxed))
@@ -307,17 +330,20 @@ impl CycleHeads {
307330
self.0.reserve(other.0.len());
308331

309332
for head in other {
310-
self.insert(head);
333+
debug_assert!(!head.removed.load(Ordering::Relaxed));
334+
self.insert(head.database_key_index, head.iteration_count.load());
311335
}
312336
}
313337

314-
pub(crate) fn insert(&mut self, head: &CycleHead) -> bool {
315-
debug_assert!(!head.removed.load(Ordering::Relaxed));
316-
338+
pub(crate) fn insert(
339+
&mut self,
340+
database_key_index: DatabaseKeyIndex,
341+
iteration_count: IterationCount,
342+
) -> bool {
317343
if let Some(existing) = self
318344
.0
319345
.iter_mut()
320-
.find(|candidate| candidate.database_key_index == head.database_key_index)
346+
.find(|candidate| candidate.database_key_index == database_key_index)
321347
{
322348
let removed = existing.removed.get_mut();
323349

@@ -327,18 +353,18 @@ impl CycleHeads {
327353
true
328354
} else {
329355
let existing_count = existing.iteration_count.load_mut();
330-
let head_count = head.iteration_count.load();
331356

332357
assert_eq!(
333-
existing_count, head_count,
334-
"Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {head_count:?})",
358+
existing_count, iteration_count,
359+
"Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {iteration_count:?})",
335360
existing.database_key_index
336361
);
337362

338363
false
339364
}
340365
} else {
341-
self.0.push(head.clone());
366+
self.0
367+
.push(CycleHead::new(database_key_index, iteration_count));
342368
true
343369
}
344370
}

src/function/execute.rs

Lines changed: 75 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,14 @@ where
160160
let mut iteration_count = IterationCount::initial();
161161

162162
if let Some(old_memo) = opt_old_memo {
163+
let memo_iteration_count = old_memo.revisions.iteration();
164+
163165
if old_memo.verified_at.load() == zalsa.current_revision()
164166
&& old_memo.cycle_heads().contains(&database_key_index)
167+
&& !memo_iteration_count.is_panicked()
165168
{
166169
previous_memo = Some(old_memo);
167-
iteration_count = old_memo.revisions.iteration();
170+
iteration_count = memo_iteration_count;
168171
}
169172
}
170173

@@ -185,48 +188,64 @@ where
185188

186189
// If there are no cycle heads, break out of the loop (`cycle_heads_mut` returns `None` if the cycle head list is empty)
187190
let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() else {
191+
claim_guard.set_release_mode(ReleaseMode::SelfOnly);
188192
break (new_value, completed_query);
189193
};
190194

191-
// TODO: Remove "removed" cycle heads"
192195
let mut cycle_heads = std::mem::take(cycle_heads);
193-
194-
// Recursively resolve all cycle heads that this head depends on.
195-
// This isn't required in a single-threaded execution but it's not guaranteed that all nested cycles are listed
196-
// in cycle heads in a multi-threaded execution:
196+
let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 1]> =
197+
SmallVec::new_const();
198+
let mut max_iteration_count = iteration_count;
199+
let mut depends_on_self = false;
200+
201+
// Ensure that we resolve the latest cycle heads from any provisional value this query depended on during execution.
202+
// This isn't required in a single-threaded execution, but it's not guaranteed that `cycle_heads` contains all cycles
203+
// in a multi-threaded execution:
197204
//
198205
// t1: a -> b
199206
// t2: c -> b (blocks on t1)
200207
// t1: a -> b -> c (cycle, returns fixpoint initial with c(0) in heads)
201208
// t1: a -> b (completes b, b has c(0) in its cycle heads, releases `b`, which resumes `t2`, and `retry_provisional` blocks on `c` (t2))
202209
// t2: c -> a (cycle, returns fixpoint initial for a with a(0) in heads)
203210
// t2: completes c, `provisional_retry` blocks on `a` (t2)
204-
// t1: a (complets `b` with `c` in heads)
211+
// t1: a (completes `b` with `c` in heads)
205212
//
206213
// Note how `a` only depends on `c` but not `a`. This is because `a` only saw the initial value of `c` and wasn't updated when `c` completed.
207214
// That's why we need to resolve the cycle heads recursively to `cycle_heads` contains all cycle heads at the moment this query completed.
208-
let mut queue: SmallVec<[DatabaseKeyIndex; 4]> = cycle_heads
209-
.iter()
210-
.map(|head| head.database_key_index)
211-
.filter(|head| *head != database_key_index)
212-
.collect();
213-
214-
// TODO: Can we also resolve whether the cycles have converged here?
215-
while let Some(head) = queue.pop() {
216-
let ingredient = zalsa.lookup_ingredient(head.ingredient_index());
217-
let nested_heads = ingredient.cycle_heads(zalsa, head.key_index());
218-
219-
for head in nested_heads {
220-
if cycle_heads.insert(head) && !queue.contains(&head.database_key_index) {
221-
queue.push(head.database_key_index);
215+
for head in &cycle_heads {
216+
max_iteration_count = max_iteration_count.max(head.iteration_count.load());
217+
depends_on_self |= head.database_key_index == database_key_index;
218+
219+
let ingredient =
220+
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
221+
222+
for nested_head in
223+
ingredient.cycle_heads(zalsa, head.database_key_index.key_index())
224+
{
225+
let nested_as_tuple = (
226+
nested_head.database_key_index,
227+
nested_head.iteration_count.load(),
228+
);
229+
230+
if !cycle_heads.contains(&nested_head.database_key_index)
231+
&& !missing_heads.contains(&nested_as_tuple)
232+
{
233+
missing_heads.push(nested_as_tuple);
222234
}
223235
}
224236
}
225237

238+
for (head_key, iteration_count) in missing_heads {
239+
max_iteration_count = max_iteration_count.max(iteration_count);
240+
depends_on_self |= head_key == database_key_index;
241+
242+
cycle_heads.insert(head_key, iteration_count);
243+
}
244+
226245
let outer_cycle = outer_cycle(zalsa, zalsa_local, &cycle_heads, database_key_index);
227246

228247
// Did the new result we got depend on our own provisional value, in a cycle?
229-
if !cycle_heads.contains(&database_key_index) {
248+
if !depends_on_self {
230249
if let Some(outer) = outer_cycle {
231250
claim_guard.set_release_mode(ReleaseMode::TransferTo(outer));
232251
} else {
@@ -260,30 +279,19 @@ where
260279
let last_provisional_value = last_provisional_value.expect(
261280
"`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial",
262281
);
263-
crate::tracing::debug!(
282+
tracing::debug!(
264283
"{database_key_index:?}: execute: \
265284
I am a cycle head, comparing last provisional value with new value"
266285
);
267286

268-
// determine if it is a nested query.
269-
// This is a nested query if it depends on any other cycle head than itself
270-
// where claiming it results in a cycle. In that case, both queries form a single connected component
271-
// that we can iterate together rather than having separate nested fixpoint iterations.
272-
273287
let this_converged = C::values_equal(&new_value, last_provisional_value);
274288

275289
iteration_count = if outer_cycle.is_some() {
276290
iteration_count
277291
} else {
278-
cycle_heads
279-
.iter()
280-
.map(|head| head.iteration_count.load())
281-
.max()
282-
.unwrap_or(iteration_count)
292+
max_iteration_count
283293
};
284294

285-
// If the new result is equal to the last provisional result, the cycle has
286-
// converged and we are done.
287295
if !this_converged {
288296
// We are in a cycle that hasn't converged; ask the user's
289297
// cycle-recovery function what to do:
@@ -295,60 +303,53 @@ where
295303
) {
296304
crate::CycleRecoveryAction::Iterate => {}
297305
crate::CycleRecoveryAction::Fallback(fallback_value) => {
298-
crate::tracing::debug!(
306+
tracing::debug!(
299307
"{database_key_index:?}: execute: user cycle_fn says to fall back"
300308
);
301309
new_value = fallback_value;
302310
}
303311
}
304312
}
305313

306-
completed_query
307-
.revisions
308-
.set_cycle_converged(this_converged);
309-
310314
if let Some(outer_cycle) = outer_cycle {
311315
tracing::debug!(
312316
"Detected nested cycle {database_key_index:?}, iterate it as part of the outer cycle {outer_cycle:?}"
313317
);
314318

315319
completed_query.revisions.set_cycle_heads(cycle_heads);
320+
// Store whether this cycle has converged, so that the outer cycle can check it.
321+
completed_query
322+
.revisions
323+
.set_cycle_converged(this_converged);
316324
claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle));
317325

318326
break (new_value, completed_query);
319327
}
320328

321-
// Verify that all cycles have converged, including all inner cycles.
329+
// Verify that this cycle and all inner cycles have converged.
322330
let converged = this_converged
323-
&& cycle_heads
324-
.iter()
325-
.filter(|head| head.database_key_index != database_key_index)
326-
.all(|head| {
327-
let ingredient =
328-
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
331+
&& cycle_heads.iter_not_eq(database_key_index).all(|head| {
332+
let ingredient =
333+
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
329334

330-
let converged =
331-
ingredient.cycle_converged(zalsa, head.database_key_index.key_index());
335+
let converged =
336+
ingredient.cycle_converged(zalsa, head.database_key_index.key_index());
332337

333-
if !converged {
334-
tracing::debug!("inner cycle {database_key_index:?} has not converged");
335-
}
338+
if !converged {
339+
tracing::debug!("inner cycle {database_key_index:?} has not converged");
340+
}
336341

337-
converged
338-
});
342+
converged
343+
});
339344

340345
if converged {
341-
crate::tracing::debug!(
342-
"{database_key_index:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations"
343-
);
346+
tracing::debug!(
347+
"{database_key_index:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations"
348+
);
344349

345350
// Set the nested cycles as verified. This is necessary because
346351
// `validate_provisional` doesn't follow cycle heads recursively (and the inner memos now depend on all cycle heads).
347-
for head in cycle_heads {
348-
if head.database_key_index == database_key_index {
349-
continue;
350-
}
351-
352+
for head in cycle_heads.iter_not_eq(database_key_index) {
352353
let ingredient =
353354
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
354355
ingredient.finalize_cycle_head(zalsa, head.database_key_index.key_index());
@@ -359,10 +360,7 @@ where
359360
break (new_value, completed_query);
360361
}
361362

362-
completed_query.revisions.set_cycle_heads(cycle_heads);
363-
364-
// `iteration_count` can't overflow as we check it against `MAX_ITERATIONS`
365-
// which is less than `u32::MAX`.
363+
// The fixpoint iteration hasn't converged. Iterate again...
366364
iteration_count = iteration_count.increment().unwrap_or_else(|| {
367365
::tracing::warn!("{database_key_index:?}: execute: too many cycle iterations");
368366
panic!("{database_key_index:?}: execute: too many cycle iterations")
@@ -375,35 +373,29 @@ where
375373
})
376374
});
377375

378-
crate::tracing::info!(
376+
tracing::info!(
379377
"{database_key_index:?}: execute: iterate again ({iteration_count:?})...",
380378
);
381379

382-
completed_query
383-
.revisions
384-
.update_iteration_count_mut(database_key_index, iteration_count);
385-
386-
for head in completed_query.revisions.cycle_heads() {
387-
if head.database_key_index == database_key_index {
388-
continue;
389-
}
390-
380+
// Update the iteration count of nested cycles
381+
for head in cycle_heads.iter_not_eq(database_key_index) {
391382
let ingredient =
392383
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
393384

394-
// let iteration_count = if was_initial && !head.iteration_count.load().is_initial() {
395-
// IterationCount::first_after_restart()
396-
// } else {
397-
// iteration_count
398-
// };
399-
400385
ingredient.set_cycle_iteration_count(
401386
zalsa,
402387
head.database_key_index.key_index(),
403388
iteration_count,
404389
);
405390
}
406391

392+
// Update the iteration count of this cycle head, but only after restoring
393+
// the cycle heads array.
394+
completed_query.revisions.set_cycle_heads(cycle_heads);
395+
completed_query
396+
.revisions
397+
.update_iteration_count_mut(database_key_index, iteration_count);
398+
407399
let new_memo = self.insert_memo(
408400
zalsa,
409401
id,
@@ -527,8 +519,7 @@ fn outer_cycle(
527519
current_key: DatabaseKeyIndex,
528520
) -> Option<DatabaseKeyIndex> {
529521
cycle_heads
530-
.iter()
531-
.filter(|head| head.database_key_index != current_key)
522+
.iter_not_eq(current_key)
532523
.find(|head| {
533524
// SAFETY: We don't call into with_query_stack recursively
534525
let is_on_stack = unsafe {

src/zalsa_local.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,8 @@ struct QueryRevisionsExtraInner {
564564

565565
iteration: AtomicIterationCount,
566566

567+
/// Stores for nested cycle heads whether they've converged in the last iteration.
568+
/// This value is always `false` for other queries.
567569
cycle_converged: bool,
568570
}
569571

0 commit comments

Comments
 (0)