Skip to content

Commit

Permalink
feat: add context.is_first_time_subgraph_is_scheduled to simplify rep…
Browse files Browse the repository at this point in the history
…laying operators (#906)
  • Loading branch information
zzlk authored Sep 14, 2023
1 parent 1ce5f01 commit d254e2d
Show file tree
Hide file tree
Showing 24 changed files with 240 additions and 608 deletions.
9 changes: 9 additions & 0 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Context {
pub(crate) current_stratum: usize,

pub(crate) current_tick_start: Instant,
pub(crate) subgraph_last_tick_run_in: Option<usize>,

/// The SubgraphId of the currently running operator. When this context is
/// not being forwarded to a running operator, this field is (mostly)
Expand All @@ -50,6 +51,14 @@ impl Context {
self.current_tick_start
}

/// Gets whether this is the first time this subgraph is being scheduled for this tick
pub fn is_first_run_this_tick(&self) -> bool {
self.subgraph_last_tick_run_in
.map_or(true, |tick_last_run_in| {
self.current_tick > tick_last_run_in
})
}

/// Gets the current stratum nubmer.
pub fn current_stratum(&self) -> usize {
self.current_stratum
Expand Down
9 changes: 9 additions & 0 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Default for Hydroflow {
current_tick: 0,

current_tick_start: Instant::now(),
subgraph_last_tick_run_in: None,

subgraph_id: SubgraphId(0),

Expand Down Expand Up @@ -190,6 +191,8 @@ impl Hydroflow {
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), fields(tick = self.context.current_tick, stratum = self.context.current_stratum), ret)]
pub fn run_stratum(&mut self) -> bool {
let current_tick = self.context.current_tick;

let mut work_done = false;

while let Some(sg_id) = self.stratum_queues[self.context.current_stratum].pop_front() {
Expand All @@ -201,7 +204,9 @@ impl Hydroflow {
tracing::trace!(sg_id = sg_id.0, "Running subgraph.");

self.context.subgraph_id = sg_id;
self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in;
sg_data.subgraph.run(&mut self.context, &mut self.handoffs);
sg_data.last_tick_run_in = Some(current_tick);
}

for &handoff_id in self.subgraphs[sg_id.0].succs.iter() {
Expand Down Expand Up @@ -752,6 +757,9 @@ pub(super) struct SubgraphData {
/// `Self::succs`, as all `SubgraphData` are owned by the same vec
/// `Hydroflow::subgraphs`.
is_scheduled: Cell<bool>,

/// Keep track of the last tick that this subgraph was run in
last_tick_run_in: Option<usize>,
}
impl SubgraphData {
pub fn new(
Expand All @@ -769,6 +777,7 @@ impl SubgraphData {
preds,
succs,
is_scheduled: Cell::new(is_scheduled),
last_tick_run_in: None,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ digraph {
n8v1 [label="(n8v1) defer_tick()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n9v1 [label="(n9v1) union()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n10v1 [label="(n10v1) join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n11v1 [label="(n11v1) for_each(|x| {\l results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)\l})\l", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n11v1 [label="(n11v1) inspect(|x| println!(\"{}, {x:?}\", context.current_tick()))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n12v1 [label="(n12v1) for_each(|x| {\l results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)\l})\l", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n3v1 -> n9v1
n5v1 -> n9v1
n8v1 -> n9v1
n9v1 -> n10v1 [label="0"]
n10v1 -> n11v1
n11v1 -> n12v1
subgraph "cluster sg_5v1_var_my_join" {
label="var my_join"
n10v1
n11v1
n12v1
}
subgraph "cluster sg_5v1_var_unioner" {
label="var unioner"
Expand All @@ -58,40 +61,40 @@ digraph {
fillcolor="#dddddd"
style=filled
label = "sg_6v1\nstratum 2"
n16v1 [label="(n16v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n17v1 [label="(n17v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
}
subgraph "cluster n7v1" {
fillcolor="#dddddd"
style=filled
label = "sg_7v1\nstratum 2"
n18v1 [label="(n18v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n19v1 [label="(n19v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
}
subgraph "cluster n8v1" {
fillcolor="#dddddd"
style=filled
label = "sg_8v1\nstratum 2"
n20v1 [label="(n20v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n21v1 [label="(n21v1) identity()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
}
n2v1 -> n12v1
n4v1 -> n13v1
n6v1 -> n15v1
n7v1 -> n14v1
n12v1 [label="(n12v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n12v1 -> n10v1 [label="1", arrowhead=box, color=red]
n2v1 -> n13v1
n4v1 -> n14v1
n6v1 -> n16v1
n7v1 -> n15v1
n13v1 [label="(n13v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n13v1 -> n16v1
n13v1 -> n10v1 [label="1", arrowhead=box, color=red]
n14v1 [label="(n14v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n14v1 -> n18v1
n14v1 -> n17v1
n15v1 [label="(n15v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n15v1 -> n20v1
n16v1 -> n17v1
n17v1 [label="(n17v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n17v1 -> n5v1 [arrowhead=dot, color=red]
n18v1 -> n19v1
n19v1 [label="(n19v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n19v1 -> n8v1 [arrowhead=dot, color=red]
n20v1 -> n21v1
n21v1 [label="(n21v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n21v1 -> n7v1 [arrowhead=dot, color=red]
n15v1 -> n19v1
n16v1 [label="(n16v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n16v1 -> n21v1
n17v1 -> n18v1
n18v1 [label="(n18v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n18v1 -> n5v1 [arrowhead=dot, color=red]
n19v1 -> n20v1
n20v1 [label="(n20v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n20v1 -> n8v1 [arrowhead=dot, color=red]
n21v1 -> n22v1
n22v1 [label="(n22v1) handoff", fontname=Monaco, shape=parallelogram, style = filled, color = "#ddddff"]
n22v1 -> n7v1 [arrowhead=dot, color=red]
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,51 @@ subgraph sg_5v1 ["sg_5v1 stratum 1"]
8v1[\"(8v1) <code>defer_tick()</code>"/]:::pullClass
9v1[\"(9v1) <code>union()</code>"/]:::pullClass
10v1[\"(10v1) <code>join_fused_rhs::&lt;'static, 'static&gt;(Fold(SetUnionHashSet::default, Merge::merge))</code>"/]:::pullClass
11v1[/"<div style=text-align:center>(11v1)</div> <code>for_each(|x| {<br> results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)<br>})</code>"\]:::pushClass
11v1[\"(11v1) <code>inspect(|x| println!(&quot;{}, {x:?}&quot;, context.current_tick()))</code>"/]:::pullClass
12v1[/"<div style=text-align:center>(12v1)</div> <code>for_each(|x| {<br> results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x)<br>})</code>"\]:::pushClass
3v1-->9v1
5v1-->9v1
8v1-->9v1
9v1-->|0|10v1
10v1-->11v1
11v1-->12v1
subgraph sg_5v1_var_my_join ["var <tt>my_join</tt>"]
10v1
11v1
12v1
end
subgraph sg_5v1_var_unioner ["var <tt>unioner</tt>"]
9v1
end
end
subgraph sg_6v1 ["sg_6v1 stratum 2"]
16v1[\"(16v1) <code>identity()</code>"/]:::pullClass
17v1[\"(17v1) <code>identity()</code>"/]:::pullClass
end
subgraph sg_7v1 ["sg_7v1 stratum 2"]
18v1[\"(18v1) <code>identity()</code>"/]:::pullClass
19v1[\"(19v1) <code>identity()</code>"/]:::pullClass
end
subgraph sg_8v1 ["sg_8v1 stratum 2"]
20v1[\"(20v1) <code>identity()</code>"/]:::pullClass
21v1[\"(21v1) <code>identity()</code>"/]:::pullClass
end
2v1-->12v1
4v1-->13v1
6v1-->15v1
7v1-->14v1
12v1["(12v1) <code>handoff</code>"]:::otherClass
12v1--x|1|10v1
2v1-->13v1
4v1-->14v1
6v1-->16v1
7v1-->15v1
13v1["(13v1) <code>handoff</code>"]:::otherClass
13v1-->16v1
13v1--x|1|10v1
14v1["(14v1) <code>handoff</code>"]:::otherClass
14v1-->18v1
14v1-->17v1
15v1["(15v1) <code>handoff</code>"]:::otherClass
15v1-->20v1
16v1-->17v1
17v1["(17v1) <code>handoff</code>"]:::otherClass
17v1--o5v1
18v1-->19v1
19v1["(19v1) <code>handoff</code>"]:::otherClass
19v1--o8v1
20v1-->21v1
21v1["(21v1) <code>handoff</code>"]:::otherClass
21v1--o7v1
15v1-->19v1
16v1["(16v1) <code>handoff</code>"]:::otherClass
16v1-->21v1
17v1-->18v1
18v1["(18v1) <code>handoff</code>"]:::otherClass
18v1--o5v1
19v1-->20v1
20v1["(20v1) <code>handoff</code>"]:::otherClass
20v1--o8v1
21v1-->22v1
22v1["(22v1) <code>handoff</code>"]:::otherClass
22v1--o7v1

1 change: 1 addition & 0 deletions hydroflow/tests/surface_join_fused.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub fn static_static_lhs_streaming_rhs_blocking() {
-> [0]my_join;

my_join = join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))
-> inspect(|x| println!("{}, {x:?}", context.current_tick()))
-> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
};
assert_graphvis_snapshots!(df);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ fn main() {
),
),
);
let sg_1v1_node_17v1_persisttick = df
.add_state(std::cell::RefCell::new(0usize));
let sg_1v1_node_21v1_antijoindata_neg = df
.add_state(
std::cell::RefCell::new(
Expand All @@ -401,8 +399,6 @@ fn main() {
>::default(),
),
);
let sg_1v1_node_21v1_persisttick = df
.add_state(std::cell::RefCell::new(0_usize));
let sg_1v1_node_11v1_uniquedata = df
.add_state(
::std::cell::RefCell::new(
Expand Down Expand Up @@ -594,9 +590,6 @@ fn main() {
let mut sg_1v1_node_17v1_joindata_rhs_borrow = context
.state_ref(sg_1v1_node_17v1_joindata_rhs)
.borrow_mut();
let mut sg_1v1_node_17v1_persisttick_borrow = context
.state_ref(sg_1v1_node_17v1_persisttick)
.borrow_mut();
let op_17v1 = {
#[inline(always)]
fn check_inputs<'a, K, I1, V1, I2, V2>(
Expand Down Expand Up @@ -629,26 +622,15 @@ fn main() {
is_new_tick,
)
}
{
let __is_new_tick = if *sg_1v1_node_17v1_persisttick_borrow
<= context.current_tick()
{
*sg_1v1_node_17v1_persisttick_borrow = context
.current_tick() + 1;
true
} else {
false
};
check_inputs(
op_19v1,
op_20v1,
&mut *sg_1v1_node_17v1_joindata_lhs_borrow
.get_mut_clear(context.current_tick()),
&mut *sg_1v1_node_17v1_joindata_rhs_borrow
.get_mut_clear(context.current_tick()),
__is_new_tick,
)
}
check_inputs(
op_19v1,
op_20v1,
&mut *sg_1v1_node_17v1_joindata_lhs_borrow
.get_mut_clear(context.current_tick()),
&mut *sg_1v1_node_17v1_joindata_rhs_borrow
.get_mut_clear(context.current_tick()),
context.is_first_run_this_tick(),
)
};
let op_17v1 = {
#[allow(non_snake_case)]
Expand Down Expand Up @@ -776,25 +758,14 @@ fn main() {
is_new_tick,
)
}
let __is_new_tick = {
let mut __borrow_ident = context
.state_ref(sg_1v1_node_21v1_persisttick)
.borrow_mut();
if *__borrow_ident <= context.current_tick() {
*__borrow_ident = context.current_tick() + 1;
true
} else {
false
}
};
check_inputs(
hoff_12v3_recv,
op_23v1,
&mut *sg_1v1_node_21v1_antijoindata_neg_borrow
.get_mut_clear(context.current_tick()),
&mut *sg_1v1_node_21v1_antijoindata_pos_borrow
.get_mut_clear(context.current_tick()),
__is_new_tick,
context.is_first_run_this_tick(),
)
};
let op_21v1 = {
Expand Down
Loading

0 comments on commit d254e2d

Please sign in to comment.