From d254e2deb883f9633f8b325a595fb7c61bad42d7 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 06:05:12 -0700 Subject: [PATCH] feat: add context.is_first_time_subgraph_is_scheduled to simplify replaying operators (#906) --- hydroflow/src/scheduled/context.rs | 9 + hydroflow/src/scheduled/graph.rs | 9 + ...s_streaming_rhs_blocking@graphvis_dot.snap | 47 ++--- ...reaming_rhs_blocking@graphvis_mermaid.snap | 47 ++--- hydroflow/tests/surface_join_fused.rs | 1 + ...ore__tests__anti_join@datalog_program.snap | 49 +---- ...og_core__tests__index@datalog_program.snap | 64 ++---- ...ests__join_with_other@datalog_program.snap | 34 +--- ...tests__join_with_self@datalog_program.snap | 34 +--- ..._core__tests__persist@datalog_program.snap | 182 ++++-------------- ...s__persist_uniqueness@datalog_program.snap | 15 +- ...single_column_program@datalog_program.snap | 34 +--- ...s__transitive_closure@datalog_program.snap | 34 +--- ..._triple_relation_join@datalog_program.snap | 68 ++----- ...ests__wildcard_fields@datalog_program.snap | 34 +--- ...__wildcard_join_count@datalog_program.snap | 68 ++----- hydroflow_lang/src/graph/ops/anti_join.rs | 19 +- .../src/graph/ops/anti_join_multiset.rs | 18 +- hydroflow_lang/src/graph/ops/join.rs | 18 +- .../src/graph/ops/join_fused_lhs.rs | 18 +- .../src/graph/ops/multiset_delta.rs | 6 +- hydroflow_lang/src/graph/ops/persist.rs | 18 +- hydroflow_lang/src/graph/ops/persist_mut.rs | 11 +- .../src/graph/ops/persist_mut_keyed.rs | 11 +- 24 files changed, 240 insertions(+), 608 deletions(-) diff --git a/hydroflow/src/scheduled/context.rs b/hydroflow/src/scheduled/context.rs index 7996cd79529b..47c92385fbf8 100644 --- a/hydroflow/src/scheduled/context.rs +++ b/hydroflow/src/scheduled/context.rs @@ -30,6 +30,7 @@ pub struct Context { pub(crate) current_stratum: usize, pub(crate) current_tick_start: Instant, + pub(crate) subgraph_last_tick_run_in: Option, /// The SubgraphId of the currently running operator. When this context is /// not being forwarded to a running operator, this field is (mostly) @@ -50,6 +51,14 @@ impl Context { self.current_tick_start } + /// Gets whether this is the first time this subgraph is being scheduled for this tick + pub fn is_first_run_this_tick(&self) -> bool { + self.subgraph_last_tick_run_in + .map_or(true, |tick_last_run_in| { + self.current_tick > tick_last_run_in + }) + } + /// Gets the current stratum nubmer. pub fn current_stratum(&self) -> usize { self.current_stratum diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 7a6e8abbd5c0..47afe68eb0db 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -57,6 +57,7 @@ impl Default for Hydroflow { current_tick: 0, current_tick_start: Instant::now(), + subgraph_last_tick_run_in: None, subgraph_id: SubgraphId(0), @@ -190,6 +191,8 @@ impl Hydroflow { /// Returns true if any work was done. #[tracing::instrument(level = "trace", skip(self), fields(tick = self.context.current_tick, stratum = self.context.current_stratum), ret)] pub fn run_stratum(&mut self) -> bool { + let current_tick = self.context.current_tick; + let mut work_done = false; while let Some(sg_id) = self.stratum_queues[self.context.current_stratum].pop_front() { @@ -201,7 +204,9 @@ impl Hydroflow { tracing::trace!(sg_id = sg_id.0, "Running subgraph."); self.context.subgraph_id = sg_id; + self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in; sg_data.subgraph.run(&mut self.context, &mut self.handoffs); + sg_data.last_tick_run_in = Some(current_tick); } for &handoff_id in self.subgraphs[sg_id.0].succs.iter() { @@ -752,6 +757,9 @@ pub(super) struct SubgraphData { /// `Self::succs`, as all `SubgraphData` are owned by the same vec /// `Hydroflow::subgraphs`. is_scheduled: Cell, + + /// Keep track of the last tick that this subgraph was run in + last_tick_run_in: Option, } impl SubgraphData { pub fn new( @@ -769,6 +777,7 @@ impl SubgraphData { preds, succs, is_scheduled: Cell::new(is_scheduled), + last_tick_run_in: None, } } } diff --git a/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_dot.snap index 276a31104019..5ec57040c5d3 100644 --- a/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_dot.snap @@ -38,16 +38,19 @@ digraph { n8v1 [label="(n8v1) defer_tick()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] n9v1 [label="(n9v1) union()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] n10v1 [label="(n10v1) join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] - n11v1 [label="(n11v1) for_each(|x| {\l results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)\l})\l", fontname=Monaco, shape=house, style = filled, color = "#ffff00"] + n11v1 [label="(n11v1) inspect(|x| println!(\"{}, {x:?}\", context.current_tick()))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n12v1 [label="(n12v1) for_each(|x| {\l results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)\l})\l", fontname=Monaco, shape=house, style = filled, color = "#ffff00"] n3v1 -> n9v1 n5v1 -> n9v1 n8v1 -> n9v1 n9v1 -> n10v1 [label="0"] n10v1 -> n11v1 + n11v1 -> n12v1 subgraph "cluster sg_5v1_var_my_join" { label="var my_join" n10v1 n11v1 + n12v1 } subgraph "cluster sg_5v1_var_unioner" { label="var unioner" @@ -58,40 +61,40 @@ digraph { fillcolor="#dddddd" style=filled label = "sg_6v1\nstratum 2" - n16v1 [label="(n16v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n17v1 [label="(n17v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] } subgraph "cluster n7v1" { fillcolor="#dddddd" style=filled label = "sg_7v1\nstratum 2" - n18v1 [label="(n18v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n19v1 [label="(n19v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] } subgraph "cluster n8v1" { fillcolor="#dddddd" style=filled label = "sg_8v1\nstratum 2" - n20v1 [label="(n20v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n21v1 [label="(n21v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] } - n2v1 -> n12v1 - n4v1 -> n13v1 - n6v1 -> n15v1 - n7v1 -> n14v1 - n12v1 [label="(n12v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n12v1 -> n10v1 [label="1", arrowhead=box, color=red] + n2v1 -> n13v1 + n4v1 -> n14v1 + n6v1 -> n16v1 + n7v1 -> n15v1 n13v1 [label="(n13v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n13v1 -> n16v1 + n13v1 -> n10v1 [label="1", arrowhead=box, color=red] n14v1 [label="(n14v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n14v1 -> n18v1 + n14v1 -> n17v1 n15v1 [label="(n15v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n15v1 -> n20v1 - n16v1 -> n17v1 - n17v1 [label="(n17v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n17v1 -> n5v1 [arrowhead=dot, color=red] - n18v1 -> n19v1 - n19v1 [label="(n19v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n19v1 -> n8v1 [arrowhead=dot, color=red] - n20v1 -> n21v1 - n21v1 [label="(n21v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] - n21v1 -> n7v1 [arrowhead=dot, color=red] + n15v1 -> n19v1 + n16v1 [label="(n16v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] + n16v1 -> n21v1 + n17v1 -> n18v1 + n18v1 [label="(n18v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] + n18v1 -> n5v1 [arrowhead=dot, color=red] + n19v1 -> n20v1 + n20v1 [label="(n20v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] + n20v1 -> n8v1 [arrowhead=dot, color=red] + n21v1 -> n22v1 + n22v1 [label="(n22v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"] + n22v1 -> n7v1 [arrowhead=dot, color=red] } diff --git a/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_mermaid.snap index 4279e816ddf6..6e0d433c75ec 100644 --- a/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_join_fused__static_static_lhs_streaming_rhs_blocking@graphvis_mermaid.snap @@ -27,48 +27,51 @@ subgraph sg_5v1 ["sg_5v1 stratum 1"] 8v1[\"(8v1) defer_tick()"/]:::pullClass 9v1[\"(9v1) union()"/]:::pullClass 10v1[\"(10v1) join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))"/]:::pullClass - 11v1[/"
(11v1)
for_each(|x| {
results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)
})
"\]:::pushClass + 11v1[\"(11v1) inspect(|x| println!("{}, {x:?}", context.current_tick()))"/]:::pullClass + 12v1[/"
(12v1)
for_each(|x| {
results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)
})
"\]:::pushClass 3v1-->9v1 5v1-->9v1 8v1-->9v1 9v1-->|0|10v1 10v1-->11v1 + 11v1-->12v1 subgraph sg_5v1_var_my_join ["var my_join"] 10v1 11v1 + 12v1 end subgraph sg_5v1_var_unioner ["var unioner"] 9v1 end end subgraph sg_6v1 ["sg_6v1 stratum 2"] - 16v1[\"(16v1) identity()"/]:::pullClass + 17v1[\"(17v1) identity()"/]:::pullClass end subgraph sg_7v1 ["sg_7v1 stratum 2"] - 18v1[\"(18v1) identity()"/]:::pullClass + 19v1[\"(19v1) identity()"/]:::pullClass end subgraph sg_8v1 ["sg_8v1 stratum 2"] - 20v1[\"(20v1) identity()"/]:::pullClass + 21v1[\"(21v1) identity()"/]:::pullClass end -2v1-->12v1 -4v1-->13v1 -6v1-->15v1 -7v1-->14v1 -12v1["(12v1) handoff"]:::otherClass -12v1--x|1|10v1 +2v1-->13v1 +4v1-->14v1 +6v1-->16v1 +7v1-->15v1 13v1["(13v1) handoff"]:::otherClass -13v1-->16v1 +13v1--x|1|10v1 14v1["(14v1) handoff"]:::otherClass -14v1-->18v1 +14v1-->17v1 15v1["(15v1) handoff"]:::otherClass -15v1-->20v1 -16v1-->17v1 -17v1["(17v1) handoff"]:::otherClass -17v1--o5v1 -18v1-->19v1 -19v1["(19v1) handoff"]:::otherClass -19v1--o8v1 -20v1-->21v1 -21v1["(21v1) handoff"]:::otherClass -21v1--o7v1 +15v1-->19v1 +16v1["(16v1) handoff"]:::otherClass +16v1-->21v1 +17v1-->18v1 +18v1["(18v1) handoff"]:::otherClass +18v1--o5v1 +19v1-->20v1 +20v1["(20v1) handoff"]:::otherClass +20v1--o8v1 +21v1-->22v1 +22v1["(22v1) handoff"]:::otherClass +22v1--o7v1 diff --git a/hydroflow/tests/surface_join_fused.rs b/hydroflow/tests/surface_join_fused.rs index a1ffa3fbfd17..40b1bbb2a35c 100644 --- a/hydroflow/tests/surface_join_fused.rs +++ b/hydroflow/tests/surface_join_fused.rs @@ -172,6 +172,7 @@ pub fn static_static_lhs_streaming_rhs_blocking() { -> [0]my_join; my_join = join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge)) + -> inspect(|x| println!("{}, {x:?}", context.current_tick())) -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)); }; assert_graphvis_snapshots!(df); diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__anti_join@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__anti_join@datalog_program.snap index 856c2aa76a74..c39452068e30 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__anti_join@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__anti_join@datalog_program.snap @@ -381,8 +381,6 @@ fn main() { ), ), ); - let sg_1v1_node_17v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_21v1_antijoindata_neg = df .add_state( std::cell::RefCell::new( @@ -401,8 +399,6 @@ fn main() { >::default(), ), ); - let sg_1v1_node_21v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_1v1_node_11v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -594,9 +590,6 @@ fn main() { let mut sg_1v1_node_17v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_17v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_17v1_persisttick_borrow = context - .state_ref(sg_1v1_node_17v1_persisttick) - .borrow_mut(); let op_17v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -629,26 +622,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_17v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_17v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_19v1, - op_20v1, - &mut *sg_1v1_node_17v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_17v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_19v1, + op_20v1, + &mut *sg_1v1_node_17v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_17v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_17v1 = { #[allow(non_snake_case)] @@ -776,17 +758,6 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_1v1_node_21v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_12v3_recv, op_23v1, @@ -794,7 +765,7 @@ fn main() { .get_mut_clear(context.current_tick()), &mut *sg_1v1_node_21v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_21v1 = { diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__index@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__index@datalog_program.snap index 1e1326756b32..8c3eba0354c0 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__index@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__index@datalog_program.snap @@ -914,8 +914,6 @@ fn main() { >::default(), ), ); - let sg_5v1_node_23v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_5v1_node_11v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -926,7 +924,7 @@ fn main() { ), ); let sg_5v1_node_45v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); let sg_5v1_node_43v1_counterdata = df .add_state(::std::cell::RefCell::new(0..)); df.add_subgraph_stratified( @@ -1069,24 +1067,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_5v1_node_23v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_9v3_recv, op_22v1, &mut *sg_5v1_node_23v1_antijoindata_neg_borrow, &mut *sg_5v1_node_23v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_23v1 = op_23v1.map(|(k, ())| k); @@ -1270,23 +1257,17 @@ fn main() { let mut sg_5v1_node_45v1_persistvec = context .state_ref(sg_5v1_node_45v1_persistdata) .borrow_mut(); - let ( - ref mut sg_5v1_node_45v1_persisttick, - ref mut sg_5v1_node_45v1_persistvec, - ) = &mut *sg_5v1_node_45v1_persistvec; let op_45v1 = { fn constrain_types<'ctx, Push, Item>( - curr_tick: usize, - tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push, + is_new_tick: bool, ) -> impl 'ctx + hydroflow::pusherator::Pusherator where Push: 'ctx + hydroflow::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter() .cloned() .for_each(|item| { @@ -1302,10 +1283,9 @@ fn main() { ) } constrain_types( - context.current_tick(), - sg_5v1_node_45v1_persisttick, - sg_5v1_node_45v1_persistvec, + &mut *sg_5v1_node_45v1_persistvec, op_11v1, + context.is_first_run_this_tick(), ) }; let op_45v1 = { @@ -1795,10 +1775,8 @@ fn main() { >::default(), ), ); - let sg_7v1_node_18v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_7v1_node_31v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(7v1)", 1, @@ -1970,24 +1948,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_7v1_node_18v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_15v3_recv, op_17v1, &mut *sg_7v1_node_18v1_antijoindata_neg_borrow, &mut *sg_7v1_node_18v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_18v1 = op_18v1.map(|(k, ())| k); @@ -2057,23 +2024,17 @@ fn main() { let mut sg_7v1_node_31v1_persistvec = context .state_ref(sg_7v1_node_31v1_persistdata) .borrow_mut(); - let ( - ref mut sg_7v1_node_31v1_persisttick, - ref mut sg_7v1_node_31v1_persistvec, - ) = &mut *sg_7v1_node_31v1_persistvec; let op_31v1 = { fn constrain_types<'ctx, Push, Item>( - curr_tick: usize, - tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push, + is_new_tick: bool, ) -> impl 'ctx + hydroflow::pusherator::Pusherator where Push: 'ctx + hydroflow::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter() .cloned() .for_each(|item| { @@ -2089,10 +2050,9 @@ fn main() { ) } constrain_types( - context.current_tick(), - sg_7v1_node_31v1_persisttick, - sg_7v1_node_31v1_persistvec, + &mut *sg_7v1_node_31v1_persistvec, op_32v1, + context.is_first_run_this_tick(), ) }; let op_31v1 = { diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_other@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_other@datalog_program.snap index 02f5fb6a9c86..2bf2ac150371 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_other@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_other@datalog_program.snap @@ -76,8 +76,6 @@ fn main() { ), ), ); - let sg_1v1_node_13v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_8v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -339,9 +337,6 @@ fn main() { let mut sg_1v1_node_13v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_13v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_13v1_persisttick_borrow = context - .state_ref(sg_1v1_node_13v1_persisttick) - .borrow_mut(); let op_13v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -374,26 +369,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_13v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_13v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_15v1, - op_16v1, - &mut *sg_1v1_node_13v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_13v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_15v1, + op_16v1, + &mut *sg_1v1_node_13v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_13v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_13v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_self@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_self@datalog_program.snap index 334bb2dfe3ff..2d8a21bcbf4f 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_self@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__join_with_self@datalog_program.snap @@ -210,8 +210,6 @@ fn main() { ), ), ); - let sg_2v1_node_9v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_2v1_node_5v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -301,9 +299,6 @@ fn main() { let mut sg_2v1_node_9v1_joindata_rhs_borrow = context .state_ref(sg_2v1_node_9v1_joindata_rhs) .borrow_mut(); - let mut sg_2v1_node_9v1_persisttick_borrow = context - .state_ref(sg_2v1_node_9v1_persisttick) - .borrow_mut(); let op_9v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -336,26 +331,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_2v1_node_9v1_persisttick_borrow - <= context.current_tick() - { - *sg_2v1_node_9v1_persisttick_borrow = context.current_tick() - + 1; - true - } else { - false - }; - check_inputs( - op_11v1, - op_12v1, - &mut *sg_2v1_node_9v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_2v1_node_9v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_11v1, + op_12v1, + &mut *sg_2v1_node_9v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_2v1_node_9v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_9v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist@datalog_program.snap index 388cecba0c48..0499fbb6a650 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist@datalog_program.snap @@ -523,8 +523,6 @@ fn main() { hydroflow::compiled::pull::HalfMultisetJoinState::default(), ), ); - let sg_4v1_node_41v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_4v1_node_45v1_joindata_lhs = df .add_state( std::cell::RefCell::new( @@ -539,8 +537,6 @@ fn main() { ), ), ); - let sg_4v1_node_45v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_4v1_node_15v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -684,9 +680,6 @@ fn main() { let mut sg_4v1_node_41v1_joindata_rhs_borrow = context .state_ref(sg_4v1_node_41v1_joindata_rhs) .borrow_mut(); - let mut sg_4v1_node_41v1_persisttick_borrow = context - .state_ref(sg_4v1_node_41v1_persisttick) - .borrow_mut(); let op_41v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -719,24 +712,13 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_4v1_node_41v1_persisttick_borrow - <= context.current_tick() - { - *sg_4v1_node_41v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_43v1, - op_44v1, - &mut *sg_4v1_node_41v1_joindata_lhs_borrow, - &mut *sg_4v1_node_41v1_joindata_rhs_borrow, - __is_new_tick, - ) - } + check_inputs( + op_43v1, + op_44v1, + &mut *sg_4v1_node_41v1_joindata_lhs_borrow, + &mut *sg_4v1_node_41v1_joindata_rhs_borrow, + context.is_first_run_this_tick(), + ) }; let op_41v1 = { #[allow(non_snake_case)] @@ -872,9 +854,6 @@ fn main() { let mut sg_4v1_node_45v1_joindata_rhs_borrow = context .state_ref(sg_4v1_node_45v1_joindata_rhs) .borrow_mut(); - let mut sg_4v1_node_45v1_persisttick_borrow = context - .state_ref(sg_4v1_node_45v1_persisttick) - .borrow_mut(); let op_45v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -907,25 +886,14 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_4v1_node_45v1_persisttick_borrow - <= context.current_tick() - { - *sg_4v1_node_45v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_47v1, - op_48v1, - &mut *sg_4v1_node_45v1_joindata_lhs_borrow, - &mut *sg_4v1_node_45v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_47v1, + op_48v1, + &mut *sg_4v1_node_45v1_joindata_lhs_borrow, + &mut *sg_4v1_node_45v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_45v1 = { #[allow(non_snake_case)] @@ -1150,7 +1118,7 @@ fn main() { }, ); let sg_5v1_node_53v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); let sg_5v1_node_51v1_antijoindata_neg = df .add_state( std::cell::RefCell::new( @@ -1169,8 +1137,6 @@ fn main() { >::default(), ), ); - let sg_5v1_node_51v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_5v1_node_18v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -1193,13 +1159,8 @@ fn main() { let mut sg_5v1_node_53v1_persistvec = context .state_ref(sg_5v1_node_53v1_persistdata) .borrow_mut(); - let ( - ref mut sg_5v1_node_53v1_persisttick, - ref mut sg_5v1_node_53v1_persistvec, - ) = &mut *sg_5v1_node_53v1_persistvec; let op_53v1 = { - if *sg_5v1_node_53v1_persisttick <= context.current_tick() { - *sg_5v1_node_53v1_persisttick = 1 + context.current_tick(); + if context.is_first_run_this_tick() { sg_5v1_node_53v1_persistvec.extend(hoff_23v3_recv); sg_5v1_node_53v1_persistvec.iter().cloned() } else { @@ -1301,17 +1262,6 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_5v1_node_51v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_20v3_recv, op_54v1, @@ -1319,7 +1269,7 @@ fn main() { .get_mut_clear(context.current_tick()), &mut *sg_5v1_node_51v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_51v1 = { @@ -1563,8 +1513,6 @@ fn main() { >::default(), ), ); - let sg_6v1_node_3v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_6v1_node_21v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -1584,7 +1532,7 @@ fn main() { ), ); let sg_6v1_node_61v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(6v1)", 1, @@ -1702,24 +1650,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_6v1_node_3v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_28v3_recv, op_2v1, &mut *sg_6v1_node_3v1_antijoindata_neg_borrow, &mut *sg_6v1_node_3v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_3v1 = op_3v1.map(|(k, ())| k); @@ -1984,23 +1921,17 @@ fn main() { let mut sg_6v1_node_61v1_persistvec = context .state_ref(sg_6v1_node_61v1_persistdata) .borrow_mut(); - let ( - ref mut sg_6v1_node_61v1_persisttick, - ref mut sg_6v1_node_61v1_persistvec, - ) = &mut *sg_6v1_node_61v1_persistvec; let op_61v1 = { fn constrain_types<'ctx, Push, Item>( - curr_tick: usize, - tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push, + is_new_tick: bool, ) -> impl 'ctx + hydroflow::pusherator::Pusherator where Push: 'ctx + hydroflow::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter() .cloned() .for_each(|item| { @@ -2016,10 +1947,9 @@ fn main() { ) } constrain_types( - context.current_tick(), - sg_6v1_node_61v1_persisttick, - sg_6v1_node_61v1_persistvec, + &mut *sg_6v1_node_61v1_persistvec, op_27v1, + context.is_first_run_this_tick(), ) }; let op_61v1 = { @@ -2193,8 +2123,6 @@ fn main() { >::default(), ), ); - let sg_7v1_node_31v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_7v1_node_24v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -2205,7 +2133,7 @@ fn main() { ), ); let sg_7v1_node_68v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(7v1)", 1, @@ -2336,24 +2264,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_7v1_node_31v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_16v3_recv, op_30v1, &mut *sg_7v1_node_31v1_antijoindata_neg_borrow, &mut *sg_7v1_node_31v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_31v1 = op_31v1.map(|(k, ())| k); @@ -2471,23 +2388,17 @@ fn main() { let mut sg_7v1_node_68v1_persistvec = context .state_ref(sg_7v1_node_68v1_persistdata) .borrow_mut(); - let ( - ref mut sg_7v1_node_68v1_persisttick, - ref mut sg_7v1_node_68v1_persistvec, - ) = &mut *sg_7v1_node_68v1_persistvec; let op_68v1 = { fn constrain_types<'ctx, Push, Item>( - curr_tick: usize, - tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push, + is_new_tick: bool, ) -> impl 'ctx + hydroflow::pusherator::Pusherator where Push: 'ctx + hydroflow::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter() .cloned() .for_each(|item| { @@ -2503,10 +2414,9 @@ fn main() { ) } constrain_types( - context.current_tick(), - sg_7v1_node_68v1_persisttick, - sg_7v1_node_68v1_persistvec, + &mut *sg_7v1_node_68v1_persistvec, op_24v1, + context.is_first_run_this_tick(), ) }; let op_68v1 = { @@ -2671,10 +2581,8 @@ fn main() { >::default(), ), ); - let sg_8v1_node_8v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); let sg_8v1_node_55v1_persistdata = df - .add_state(::std::cell::RefCell::new((0_usize, ::std::vec::Vec::new()))); + .add_state(::std::cell::RefCell::new(::std::vec::Vec::new())); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(8v1)", 1, @@ -2783,24 +2691,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_8v1_node_8v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_22v3_recv, op_7v1, &mut *sg_8v1_node_8v1_antijoindata_neg_borrow, &mut *sg_8v1_node_8v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_8v1 = op_8v1.map(|(k, ())| k); @@ -2871,23 +2768,17 @@ fn main() { let mut sg_8v1_node_55v1_persistvec = context .state_ref(sg_8v1_node_55v1_persistdata) .borrow_mut(); - let ( - ref mut sg_8v1_node_55v1_persisttick, - ref mut sg_8v1_node_55v1_persistvec, - ) = &mut *sg_8v1_node_55v1_persistvec; let op_55v1 = { fn constrain_types<'ctx, Push, Item>( - curr_tick: usize, - tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push, + is_new_tick: bool, ) -> impl 'ctx + hydroflow::pusherator::Pusherator where Push: 'ctx + hydroflow::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter() .cloned() .for_each(|item| { @@ -2903,10 +2794,9 @@ fn main() { ) } constrain_types( - context.current_tick(), - sg_8v1_node_55v1_persisttick, - sg_8v1_node_55v1_persistvec, + &mut *sg_8v1_node_55v1_persistvec, op_56v1, + context.is_first_run_this_tick(), ) }; let op_55v1 = { diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist_uniqueness@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist_uniqueness@datalog_program.snap index 8dcf7966c9bb..4aad8bfe3cf6 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist_uniqueness@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__persist_uniqueness@datalog_program.snap @@ -433,8 +433,6 @@ fn main() { >::default(), ), ); - let sg_3v1_node_3v1_persisttick = df - .add_state(std::cell::RefCell::new(0_usize)); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(3v1)", 1, @@ -650,24 +648,13 @@ fn main() { is_new_tick, ) } - let __is_new_tick = { - let mut __borrow_ident = context - .state_ref(sg_3v1_node_3v1_persisttick) - .borrow_mut(); - if *__borrow_ident <= context.current_tick() { - *__borrow_ident = context.current_tick() + 1; - true - } else { - false - } - }; check_inputs( hoff_11v3_recv, op_2v1, &mut *sg_3v1_node_3v1_antijoindata_neg_borrow, &mut *sg_3v1_node_3v1_antijoindata_pos_borrow .get_mut_clear(context.current_tick()), - __is_new_tick, + context.is_first_run_this_tick(), ) }; let op_3v1 = op_3v1.map(|(k, ())| k); diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__single_column_program@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__single_column_program@datalog_program.snap index 15b18942e741..253649f27b12 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__single_column_program@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__single_column_program@datalog_program.snap @@ -76,8 +76,6 @@ fn main() { ), ), ); - let sg_1v1_node_13v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_8v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -339,9 +337,6 @@ fn main() { let mut sg_1v1_node_13v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_13v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_13v1_persisttick_borrow = context - .state_ref(sg_1v1_node_13v1_persisttick) - .borrow_mut(); let op_13v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -374,26 +369,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_13v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_13v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_15v1, - op_16v1, - &mut *sg_1v1_node_13v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_13v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_15v1, + op_16v1, + &mut *sg_1v1_node_13v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_13v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_13v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__transitive_closure@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__transitive_closure@datalog_program.snap index ca24afcb6e51..14a09ec625b2 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__transitive_closure@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__transitive_closure@datalog_program.snap @@ -81,8 +81,6 @@ fn main() { ), ), ); - let sg_1v1_node_15v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_8v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -415,9 +413,6 @@ fn main() { let mut sg_1v1_node_15v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_15v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_15v1_persisttick_borrow = context - .state_ref(sg_1v1_node_15v1_persisttick) - .borrow_mut(); let op_15v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -450,26 +445,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_15v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_15v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_17v1, - op_18v1, - &mut *sg_1v1_node_15v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_15v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_17v1, + op_18v1, + &mut *sg_1v1_node_15v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_15v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_15v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__triple_relation_join@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__triple_relation_join@datalog_program.snap index 624fa7bb510e..4f019e941a6f 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__triple_relation_join@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__triple_relation_join@datalog_program.snap @@ -100,8 +100,6 @@ fn main() { ), ), ); - let sg_1v1_node_17v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_21v1_joindata_lhs = df .add_state( std::cell::RefCell::new( @@ -118,8 +116,6 @@ fn main() { ), ), ); - let sg_1v1_node_21v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_1v1_node_11v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -469,9 +465,6 @@ fn main() { let mut sg_1v1_node_17v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_17v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_17v1_persisttick_borrow = context - .state_ref(sg_1v1_node_17v1_persisttick) - .borrow_mut(); let op_17v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -504,26 +497,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_17v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_17v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_19v1, - op_20v1, - &mut *sg_1v1_node_17v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_17v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_19v1, + op_20v1, + &mut *sg_1v1_node_17v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_17v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_17v1 = { #[allow(non_snake_case)] @@ -659,9 +641,6 @@ fn main() { let mut sg_1v1_node_21v1_joindata_rhs_borrow = context .state_ref(sg_1v1_node_21v1_joindata_rhs) .borrow_mut(); - let mut sg_1v1_node_21v1_persisttick_borrow = context - .state_ref(sg_1v1_node_21v1_persisttick) - .borrow_mut(); let op_21v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -694,26 +673,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_1v1_node_21v1_persisttick_borrow - <= context.current_tick() - { - *sg_1v1_node_21v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_23v1, - op_24v1, - &mut *sg_1v1_node_21v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_1v1_node_21v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_23v1, + op_24v1, + &mut *sg_1v1_node_21v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_1v1_node_21v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_21v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_fields@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_fields@datalog_program.snap index 1da92c80953a..aba474ed641a 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_fields@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_fields@datalog_program.snap @@ -210,8 +210,6 @@ fn main() { ), ), ); - let sg_2v1_node_9v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); let sg_2v1_node_5v1_uniquedata = df .add_state( ::std::cell::RefCell::new( @@ -301,9 +299,6 @@ fn main() { let mut sg_2v1_node_9v1_joindata_rhs_borrow = context .state_ref(sg_2v1_node_9v1_joindata_rhs) .borrow_mut(); - let mut sg_2v1_node_9v1_persisttick_borrow = context - .state_ref(sg_2v1_node_9v1_persisttick) - .borrow_mut(); let op_9v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -336,26 +331,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_2v1_node_9v1_persisttick_borrow - <= context.current_tick() - { - *sg_2v1_node_9v1_persisttick_borrow = context.current_tick() - + 1; - true - } else { - false - }; - check_inputs( - op_11v1, - op_12v1, - &mut *sg_2v1_node_9v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_2v1_node_9v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_11v1, + op_12v1, + &mut *sg_2v1_node_9v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_2v1_node_9v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_9v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_join_count@datalog_program.snap b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_join_count@datalog_program.snap index 2ff5178a0384..71a69193f072 100644 --- a/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_join_count@datalog_program.snap +++ b/hydroflow_datalog_core/src/snapshots/hydroflow_datalog_core__tests__wildcard_join_count@datalog_program.snap @@ -828,8 +828,6 @@ fn main() { ), ), ); - let sg_5v1_node_17v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(5v1)", 0, @@ -919,9 +917,6 @@ fn main() { let mut sg_5v1_node_17v1_joindata_rhs_borrow = context .state_ref(sg_5v1_node_17v1_joindata_rhs) .borrow_mut(); - let mut sg_5v1_node_17v1_persisttick_borrow = context - .state_ref(sg_5v1_node_17v1_persisttick) - .borrow_mut(); let op_17v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -954,26 +949,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_5v1_node_17v1_persisttick_borrow - <= context.current_tick() - { - *sg_5v1_node_17v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_19v1, - op_20v1, - &mut *sg_5v1_node_17v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_5v1_node_17v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_19v1, + op_20v1, + &mut *sg_5v1_node_17v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_5v1_node_17v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_17v1 = { #[allow(non_snake_case)] @@ -1098,8 +1082,6 @@ fn main() { ), ), ); - let sg_6v1_node_24v1_persisttick = df - .add_state(std::cell::RefCell::new(0usize)); df.add_subgraph_stratified( "Subgraph GraphSubgraphId(6v1)", 0, @@ -1189,9 +1171,6 @@ fn main() { let mut sg_6v1_node_24v1_joindata_rhs_borrow = context .state_ref(sg_6v1_node_24v1_joindata_rhs) .borrow_mut(); - let mut sg_6v1_node_24v1_persisttick_borrow = context - .state_ref(sg_6v1_node_24v1_persisttick) - .borrow_mut(); let op_24v1 = { #[inline(always)] fn check_inputs<'a, K, I1, V1, I2, V2>( @@ -1224,26 +1203,15 @@ fn main() { is_new_tick, ) } - { - let __is_new_tick = if *sg_6v1_node_24v1_persisttick_borrow - <= context.current_tick() - { - *sg_6v1_node_24v1_persisttick_borrow = context - .current_tick() + 1; - true - } else { - false - }; - check_inputs( - op_26v1, - op_27v1, - &mut *sg_6v1_node_24v1_joindata_lhs_borrow - .get_mut_clear(context.current_tick()), - &mut *sg_6v1_node_24v1_joindata_rhs_borrow - .get_mut_clear(context.current_tick()), - __is_new_tick, - ) - } + check_inputs( + op_26v1, + op_27v1, + &mut *sg_6v1_node_24v1_joindata_lhs_borrow + .get_mut_clear(context.current_tick()), + &mut *sg_6v1_node_24v1_joindata_rhs_borrow + .get_mut_clear(context.current_tick()), + context.is_first_run_this_tick(), + ) }; let op_24v1 = { #[allow(non_snake_case)] diff --git a/hydroflow_lang/src/graph/ops/anti_join.rs b/hydroflow_lang/src/graph/ops/anti_join.rs index 78cc32dff157..c6ed5eec7be9 100644 --- a/hydroflow_lang/src/graph/ops/anti_join.rs +++ b/hydroflow_lang/src/graph/ops/anti_join.rs @@ -107,7 +107,6 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { make_antijoindata(persistences[0], "pos")?; let (neg_antijoindata_ident, neg_borrow_ident, neg_init, neg_borrow) = make_antijoindata(persistences[1], "neg")?; - let tick_ident = wc.make_ident("persisttick"); let write_prologue = quote_spanned! {op_span=> let #neg_antijoindata_ident = #hydroflow.add_state(std::cell::RefCell::new( @@ -116,9 +115,6 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { let #pos_antijoindata_ident = #hydroflow.add_state(std::cell::RefCell::new( #pos_init )); - let #tick_ident = #hydroflow.add_state(std::cell::RefCell::new( - 0_usize - )); }; let input_neg = &inputs[0]; // N before P @@ -148,25 +144,12 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { #root::compiled::pull::anti_join_into_iter(input_pos, neg_state, pos_state, is_new_tick) } - let __is_new_tick = { - let mut __borrow_ident = #context.state_ref(#tick_ident).borrow_mut(); - - if *__borrow_ident <= #context.current_tick() { - *__borrow_ident = #context.current_tick() + 1; - // new tick - true - } else { - // same tick. - false - } - }; - check_inputs( #input_neg, #input_pos, #neg_borrow, #pos_borrow, - __is_new_tick, + context.is_first_run_this_tick(), ) }; } diff --git a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs index 17d43426c79a..a14c8d8f2080 100644 --- a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs @@ -116,10 +116,9 @@ pub const ANTI_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { let write_prologue_pos = match persistences[0] { Persistence::Tick => quote_spanned! {op_span=>}, Persistence::Static => quote_spanned! {op_span=> - let #pos_antijoindata_ident = #hydroflow.add_state(std::cell::RefCell::new(( - 0_usize, + let #pos_antijoindata_ident = #hydroflow.add_state(std::cell::RefCell::new( ::std::vec::Vec::new() - ))); + )); }, Persistence::Mutable => { diagnostics.push(Diagnostic::spanned( @@ -162,18 +161,17 @@ pub const ANTI_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { let #ident = { #[allow(clippy::clone_on_copy)] #[allow(suspicious_double_ref_op)] - if #pos_borrow_ident.0 <= #context.current_tick() { + if context.is_first_run_this_tick() { // Start of new tick #neg_borrow.extend(#input_neg); - #pos_borrow_ident.0 = 1 + #context.current_tick(); - #pos_borrow_ident.1.extend(#input_pos); - #pos_borrow_ident.1.iter() + #pos_borrow_ident.extend(#input_pos); + #pos_borrow_ident.iter() } else { // Called second or later times on the same tick. - let len = #pos_borrow_ident.1.len(); - #pos_borrow_ident.1.extend(#input_pos); - #pos_borrow_ident.1[len..].iter() + let len = #pos_borrow_ident.len(); + #pos_borrow_ident.extend(#input_pos); + #pos_borrow_ident[len..].iter() } .filter(|x| { #[allow(clippy::unnecessary_mut_passed)] diff --git a/hydroflow_lang/src/graph/ops/join.rs b/hydroflow_lang/src/graph/ops/join.rs index c5118fb95c8b..5b415b0fc8c1 100644 --- a/hydroflow_lang/src/graph/ops/join.rs +++ b/hydroflow_lang/src/graph/ops/join.rs @@ -185,9 +185,6 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { let (rhs_joindata_ident, rhs_borrow_ident, rhs_init, rhs_borrow) = make_joindata(persistences[1], "rhs")?; - let tick_ident = wc.make_ident("persisttick"); - let tick_borrow_ident = wc.make_ident("persisttick_borrow"); - let write_prologue = quote_spanned! {op_span=> let #lhs_joindata_ident = #hydroflow.add_state(std::cell::RefCell::new( #lhs_init @@ -195,9 +192,6 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { let #rhs_joindata_ident = #hydroflow.add_state(std::cell::RefCell::new( #rhs_init )); - let #tick_ident = #hydroflow.add_state(std::cell::RefCell::new( - 0usize - )); }; let lhs = &inputs[0]; @@ -205,7 +199,6 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { let write_iterator = quote_spanned! {op_span=> let mut #lhs_borrow_ident = #context.state_ref(#lhs_joindata_ident).borrow_mut(); let mut #rhs_borrow_ident = #context.state_ref(#rhs_joindata_ident).borrow_mut(); - let mut #tick_borrow_ident = #context.state_ref(#tick_ident).borrow_mut(); let #ident = { // Limit error propagation by bounding locally, erasing output iterator type. #[inline(always)] @@ -226,16 +219,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { #root::compiled::pull::symmetric_hash_join_into_iter(lhs, rhs, lhs_state, rhs_state, is_new_tick) } - { - let __is_new_tick = if *#tick_borrow_ident <= #context.current_tick() { - *#tick_borrow_ident = #context.current_tick() + 1; - true - } else { - false - }; - - check_inputs(#lhs, #rhs, #lhs_borrow, #rhs_borrow, __is_new_tick) - } + check_inputs(#lhs, #rhs, #lhs_borrow, #rhs_borrow, #context.is_first_run_this_tick()) }; }; diff --git a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs index 68642cc1ffb2..b1a1a156cf7c 100644 --- a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs +++ b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs @@ -88,10 +88,9 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { let write_prologue_rhs = match persistences[1] { Persistence::Tick => quote_spanned! {op_span=>}, Persistence::Static => quote_spanned! {op_span=> - let #rhs_joindata_ident = #hydroflow.add_state(std::cell::RefCell::new(( - 0_usize, + let #rhs_joindata_ident = #hydroflow.add_state(::std::cell::RefCell::new( ::std::vec::Vec::new() - ))); + )); }, Persistence::Mutable => { diagnostics.push(Diagnostic::spanned( @@ -143,14 +142,13 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { #[allow(clippy::clone_on_copy)] #[allow(suspicious_double_ref_op)] - if #rhs_borrow_ident.0 <= #context.current_tick() { - #rhs_borrow_ident.0 = 1 + #context.current_tick(); - #rhs_borrow_ident.1.extend(#rhs); - #rhs_borrow_ident.1.iter() + if #context.is_first_run_this_tick() { + #rhs_borrow_ident.extend(#rhs); + #rhs_borrow_ident.iter() } else { - let len = #rhs_borrow_ident.1.len(); - #rhs_borrow_ident.1.extend(#rhs); - #rhs_borrow_ident.1[len..].iter() + let len = #rhs_borrow_ident.len(); + #rhs_borrow_ident.extend(#rhs); + #rhs_borrow_ident[len..].iter() } .filter_map(|(k, v2)| #lhs_borrow.table.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone())))) }; diff --git a/hydroflow_lang/src/graph/ops/multiset_delta.rs b/hydroflow_lang/src/graph/ops/multiset_delta.rs index 36c9cf501c71..a038c72df97c 100644 --- a/hydroflow_lang/src/graph/ops/multiset_delta.rs +++ b/hydroflow_lang/src/graph/ops/multiset_delta.rs @@ -65,23 +65,19 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { let input = &inputs[0]; let output = &outputs[0]; - let tick_data = wc.make_ident("tick_data"); let prev_data = wc.make_ident("prev_data"); let curr_data = wc.make_ident("curr_data"); let write_prologue = quote_spanned! {op_span=> - let #tick_data = #hydroflow.add_state(::std::cell::Cell::new(0_usize)); let #prev_data = #hydroflow.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashMap::default())); let #curr_data = #hydroflow.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashMap::default())); }; let filter_fn = quote_spanned! {op_span=> |item| { - let tick = #context.state_ref(#tick_data); let mut prev_map = #context.state_ref(#prev_data).borrow_mut(); let mut curr_map = #context.state_ref(#curr_data).borrow_mut(); - if tick.get() < #context.current_tick() { - tick.set(#context.current_tick()); + if context.is_first_run_this_tick() { ::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map)); curr_map.clear(); } diff --git a/hydroflow_lang/src/graph/ops/persist.rs b/hydroflow_lang/src/graph/ops/persist.rs index a969eb075fb3..3eb1c9579b8d 100644 --- a/hydroflow_lang/src/graph/ops/persist.rs +++ b/hydroflow_lang/src/graph/ops/persist.rs @@ -68,22 +68,18 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { _| { let persistdata_ident = wc.make_ident("persistdata"); let vec_ident = wc.make_ident("persistvec"); - let tick_ident = wc.make_ident("persisttick"); let write_prologue = quote_spanned! {op_span=> - let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new(( - 0_usize, // tick + let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new( ::std::vec::Vec::new(), - ))); + )); }; let write_iterator = if is_pull { let input = &inputs[0]; quote_spanned! {op_span=> let mut #vec_ident = #context.state_ref(#persistdata_ident).borrow_mut(); - let (ref mut #tick_ident, ref mut #vec_ident) = &mut *#vec_ident; let #ident = { - if *#tick_ident <= #context.current_tick() { - *#tick_ident = 1 + #context.current_tick(); + if context.is_first_run_this_tick() { #vec_ident.extend(#input); #vec_ident.iter().cloned() } else { @@ -97,15 +93,13 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { let output = &outputs[0]; quote_spanned! {op_span=> let mut #vec_ident = #context.state_ref(#persistdata_ident).borrow_mut(); - let (ref mut #tick_ident, ref mut #vec_ident) = &mut *#vec_ident; let #ident = { - fn constrain_types<'ctx, Push, Item>(curr_tick: usize, tick: &'ctx mut usize, vec: &'ctx mut Vec, mut output: Push) -> impl 'ctx + #root::pusherator::Pusherator + fn constrain_types<'ctx, Push, Item>(vec: &'ctx mut Vec, mut output: Push, is_new_tick: bool) -> impl 'ctx + #root::pusherator::Pusherator where Push: 'ctx + #root::pusherator::Pusherator, Item: ::std::clone::Clone, { - if *tick <= curr_tick { - *tick = 1 + curr_tick; + if is_new_tick { vec.iter().cloned().for_each(|item| { #root::pusherator::Pusherator::give(&mut output, item); }); @@ -115,7 +109,7 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { vec.last().unwrap().clone() }, output) } - constrain_types(#context.current_tick(), #tick_ident, #vec_ident, #output) + constrain_types(&mut *#vec_ident, #output, context.is_first_run_this_tick()) }; } }; diff --git a/hydroflow_lang/src/graph/ops/persist_mut.rs b/hydroflow_lang/src/graph/ops/persist_mut.rs index 6d9088b010cf..3ca5e58bf0dc 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut.rs @@ -49,28 +49,23 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { let persistdata_ident = wc.make_ident("persistdata"); let vec_ident = wc.make_ident("persistvec"); - let tick_ident = wc.make_ident("persisttick"); let write_prologue = quote_spanned! {op_span=> - let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new(( - 0_usize, // tick + let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new( #root::util::sparse_vec::SparseVec::default(), - ))); + )); }; let write_iterator = { let input = &inputs[0]; quote_spanned! {op_span=> let mut #vec_ident = #context.state_ref(#persistdata_ident).borrow_mut(); - let (ref mut #tick_ident, ref mut #vec_ident) = &mut *#vec_ident; let #ident = { #[inline(always)] fn check_iter(iter: impl Iterator>) -> impl Iterator> { iter } - if *#tick_ident <= #context.current_tick() { - *#tick_ident = 1 + #context.current_tick(); - + if context.is_first_run_this_tick() { for item in check_iter(#input) { match item { #root::util::Persistence::Persist(v) => #vec_ident.push(v), diff --git a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs index f3efc19b23c2..34a0fb5781f1 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs @@ -49,28 +49,23 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { let persistdata_ident = wc.make_ident("persistdata"); let vec_ident = wc.make_ident("persistvec"); - let tick_ident = wc.make_ident("persisttick"); let write_prologue = quote_spanned! {op_span=> - let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new(( - 0_usize, // tick + let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new( #root::rustc_hash::FxHashMap::<_, #root::util::sparse_vec::SparseVec<_>>::default() - ))); + )); }; let write_iterator = { let input = &inputs[0]; quote_spanned! {op_span=> let mut #vec_ident = #context.state_ref(#persistdata_ident).borrow_mut(); - let (ref mut #tick_ident, ref mut #vec_ident) = &mut *#vec_ident; let #ident = { #[inline(always)] fn check_iter(iter: impl Iterator>) -> impl Iterator> { iter } - if *#tick_ident <= #context.current_tick() { - *#tick_ident = 1 + #context.current_tick(); - + if context.is_first_run_this_tick() { for item in check_iter(#input) { match item { #root::util::PersistenceKeyed::Persist(k, v) => {