From f2b265704d95480e0ce775e40ecbb5c25aa1cc33 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Fri, 8 Mar 2024 16:39:21 -0800 Subject: [PATCH] cardinality example test --- ...x_reachability_generated@graphvis_dot.snap | 30 +++++++++++-------- ...achability_generated@graphvis_mermaid.snap | 30 +++++++++++-------- hydroflow/tests/surface_codegen.rs | 21 ++++++++++++- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_dot.snap index 3feb3c4cdcbd..dcc3a1f6ddbc 100644 --- a/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_dot.snap @@ -8,39 +8,43 @@ digraph { n1v1 [label="(n1v1) union()", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) map(|v| (v, ()))", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) source_iter(vec![0])", shape=invhouse, fillcolor="#88aaff"] - n4v1 [label="(n4v1) join()", shape=invhouse, fillcolor="#88aaff"] - n5v1 [label="(n5v1) map(|(_src, ((), dst))| dst)", shape=invhouse, fillcolor="#88aaff"] - n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] - n7v1 [label="(n7v1) source_stream(pairs_recv)", shape=invhouse, fillcolor="#88aaff"] - n8v1 [label="(n8v1) for_each(|x| println!(\"Reached: {}\", x))", shape=house, fillcolor="#ffff88"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n4v1 [label="(n4v1) join::<'static, 'static>()", shape=invhouse, fillcolor="#88aaff"] + n5v1 [label="(n5v1) inspect(|_| { my_card_inner.set(1 + my_card_inner.get()) })", shape=invhouse, fillcolor="#88aaff"] + n6v1 [label="(n6v1) map(|(_src, ((), dst))| dst)", shape=invhouse, fillcolor="#88aaff"] + n7v1 [label="(n7v1) tee()", shape=house, fillcolor="#ffff88"] + n8v1 [label="(n8v1) source_stream(pairs_recv)", shape=invhouse, fillcolor="#88aaff"] + n9v1 [label="(n9v1) for_each(|x| println!(\"Reached: {}\", x))", shape=house, fillcolor="#ffff88"] + n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n1v1 -> n2v1 n3v1 -> n1v1 [label="0"] + n6v1 -> n7v1 n5v1 -> n6v1 n4v1 -> n5v1 n2v1 -> n4v1 [label="0"] - n7v1 -> n4v1 [label="1"] - n6v1 -> n9v1 [label="0"] - n6v1 -> n8v1 [label="1"] - n9v1 -> n1v1 [label="1"] + n8v1 -> n4v1 [label="1"] + n7v1 -> n10v1 [label="0"] + n7v1 -> n9v1 [label="1"] + n10v1 -> n1v1 [label="1"] subgraph "cluster n1v1" { fillcolor="#dddddd" style=filled label = "sg_1v1\nstratum 0" - n9v1 + n10v1 n3v1 n1v1 n2v1 - n7v1 + n8v1 n4v1 n5v1 n6v1 - n8v1 + n7v1 + n9v1 subgraph "cluster_sg_1v1_var_my_join_tee" { label="var my_join_tee" n4v1 n5v1 n6v1 + n7v1 } subgraph "cluster_sg_1v1_var_reached_vertices" { label="var reached_vertices" diff --git a/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_mermaid.snap index 4945fa4ffe30..08e8e4c5e19e 100644 --- a/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_mermaid.snap @@ -11,35 +11,39 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) union()"/]:::pullClass 2v1[\"(2v1) map(|v| (v, ()))"/]:::pullClass 3v1[\"(3v1) source_iter(vec![0])"/]:::pullClass -4v1[\"(4v1) join()"/]:::pullClass -5v1[\"(5v1) map(|(_src, ((), dst))| dst)"/]:::pullClass -6v1[/"(6v1) tee()"\]:::pushClass -7v1[\"(7v1) source_stream(pairs_recv)"/]:::pullClass -8v1[/"(8v1) for_each(|x| println!("Reached: {}", x))"\]:::pushClass -9v1["(9v1) handoff"]:::otherClass +4v1[\"(4v1) join::<'static, 'static>()"/]:::pullClass +5v1[\"(5v1) inspect(|_| { my_card_inner.set(1 + my_card_inner.get()) })"/]:::pullClass +6v1[\"(6v1) map(|(_src, ((), dst))| dst)"/]:::pullClass +7v1[/"(7v1) tee()"\]:::pushClass +8v1[\"(8v1) source_stream(pairs_recv)"/]:::pullClass +9v1[/"(9v1) for_each(|x| println!("Reached: {}", x))"\]:::pushClass +10v1["(10v1) handoff"]:::otherClass 1v1-->2v1 3v1-->|0|1v1 +6v1-->7v1 5v1-->6v1 4v1-->5v1 2v1-->|0|4v1 -7v1-->|1|4v1 -6v1-->|0|9v1 -6v1-->|1|8v1 -9v1-->|1|1v1 +8v1-->|1|4v1 +7v1-->|0|10v1 +7v1-->|1|9v1 +10v1-->|1|1v1 subgraph sg_1v1 ["sg_1v1 stratum 0"] - 9v1 + 10v1 3v1 1v1 2v1 - 7v1 + 8v1 4v1 5v1 6v1 - 8v1 + 7v1 + 9v1 subgraph sg_1v1_var_my_join_tee ["var my_join_tee"] 4v1 5v1 6v1 + 7v1 end subgraph sg_1v1_var_reached_vertices ["var reached_vertices"] 1v1 diff --git a/hydroflow/tests/surface_codegen.rs b/hydroflow/tests/surface_codegen.rs index 957053e629c2..ca47d63cd337 100644 --- a/hydroflow/tests/surface_codegen.rs +++ b/hydroflow/tests/surface_codegen.rs @@ -574,11 +574,18 @@ pub fn test_surface_syntax_reachability_generated() { // An edge in the input data = a pair of `usize` vertex IDs. let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); + let my_card = std::rc::Rc::new(std::cell::Cell::new(0)); + let my_card_inner = std::rc::Rc::clone(&my_card); + let mut df: Hydroflow = hydroflow_syntax! { reached_vertices = union() -> map(|v| (v, ())); source_iter(vec![0]) -> [0]reached_vertices; - my_join_tee = join() -> map(|(_src, ((), dst))| dst) -> tee(); + my_join_tee = join::<'static, 'static>() + -> inspect(|_| { + my_card_inner.set(1 + my_card_inner.get()) + }) + -> map(|(_src, ((), dst))| dst) -> tee(); reached_vertices -> [0]my_join_tee; source_stream(pairs_recv) -> [1]my_join_tee; @@ -588,22 +595,34 @@ pub fn test_surface_syntax_reachability_generated() { assert_graphvis_snapshots!(df); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + pairs_send.send((0, 1)).unwrap(); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + pairs_send.send((2, 4)).unwrap(); pairs_send.send((3, 4)).unwrap(); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + pairs_send.send((1, 2)).unwrap(); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + pairs_send.send((0, 3)).unwrap(); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + pairs_send.send((0, 3)).unwrap(); df.run_available(); + println!("{} {} {}", line!(), df.current_tick(), my_card.get()); + // Reached: 1 // Reached: 2 // Reached: 4