diff --git a/README.md b/README.md index 788cfe4..434b681 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,8 @@ Once the command completes, you can cd into the project and test the template. $ cd $ cargo test ``` + +## Project Structure +Hydroflow+ uses Stageleft (https://hydro.run/docs/hydroflow_plus/stageleft), and thus requires a special project structure consisting of the main logic (in `flow`) and a auto-generated helper (in `flow_macro`). Other than keeping dependencies in sync between `flow` and `flow_macro`, you should not need to modify `flow_macro` directly. + +The template includes two sample programs, `first_ten` and `first_ten_distributed`. `first_ten` demonstrates how to use Hydroflow+ to create dataflow programs for a single machine, and can be launched by running `cargo run -p flow --bin first_ten`. `first_ten_distributed` demonstrates how to use Hydroflow+ to create dataflow programs for a distributed system, and can be launched by running `cargo run -p flow --example first_ten_distributed`. Note the use of `--example` here because `src/bin/first_ten_distributed.rs` contains the binary that will be launched on each node, whereas `examples/first_ten_distributed.rs` contains a deployment script for connecting the nodes together. diff --git a/flow/examples/first_ten_distributed.rs b/flow/examples/first_ten_distributed.rs new file mode 100644 index 0000000..ea8d274 --- /dev/null +++ b/flow/examples/first_ten_distributed.rs @@ -0,0 +1,26 @@ +use hydro_deploy::{Deployment, HydroflowCrate}; +use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::GraphBuilder::new(); + flow::first_ten_distributed::first_ten_distributed( + &builder, + &CLIDeployNodeBuilder::new(|| { + deployment.add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("first_ten_distributed") + .profile("dev"), + ) + }), + ); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap() +} diff --git a/flow/src/bin/first_ten.rs b/flow/src/bin/first_ten.rs new file mode 100644 index 0000000..1a6e166 --- /dev/null +++ b/flow/src/bin/first_ten.rs @@ -0,0 +1,4 @@ +#[tokio::main] +async fn main() { + flow::first_ten::first_ten_runtime!().run_async().await; +} diff --git a/flow/src/bin/first_ten_distributed.rs b/flow/src/bin/first_ten_distributed.rs new file mode 100644 index 0000000..e58169d --- /dev/null +++ b/flow/src/bin/first_ten_distributed.rs @@ -0,0 +1,9 @@ +#[tokio::main] +async fn main() { + let ports = hydroflow_plus::util::cli::init().await; + + hydroflow_plus::util::cli::launch_flow( + flow::first_ten_distributed::first_ten_distributed_runtime!(&ports), + ) + .await; +} diff --git a/flow/src/bin/my_dataflow.rs b/flow/src/bin/my_dataflow.rs deleted file mode 100644 index afc7a49..0000000 --- a/flow/src/bin/my_dataflow.rs +++ /dev/null @@ -1,9 +0,0 @@ -extern crate alloc; - -// cannot use hydroflow::main because connect_local_blocking causes a deadlock -#[tokio::main] -async fn main() { - let ports = hydroflow_plus::util::cli::init().await; - - hydroflow_plus::util::cli::launch_flow(flow::partitioned_char_counter_runtime!(&ports)).await; -} diff --git a/flow/src/first_ten.rs b/flow/src/first_ten.rs new file mode 100644 index 0000000..efabd87 --- /dev/null +++ b/flow/src/first_ten.rs @@ -0,0 +1,21 @@ +use hydroflow_plus::node::*; +use hydroflow_plus::*; +use stageleft::*; + +pub fn first_ten<'a, D: LocalDeploy<'a>>( + graph: &'a GraphBuilder<'a, D>, + node_builder: &impl NodeBuilder<'a, D>, +) { + let node = graph.node(node_builder); + + let numbers = node.source_iter(q!(0..10)); + numbers.for_each(q!(|n| println!("{}", n))); +} + +#[stageleft::entry] +pub fn first_ten_runtime<'a>( + graph: &'a GraphBuilder<'a, SingleGraph>, +) -> impl Quoted<'a, Hydroflow<'a>> { + first_ten(graph, &()); + graph.build_single() +} diff --git a/flow/src/first_ten_distributed.rs b/flow/src/first_ten_distributed.rs new file mode 100644 index 0000000..26e150a --- /dev/null +++ b/flow/src/first_ten_distributed.rs @@ -0,0 +1,67 @@ +use hydroflow_plus::node::*; +use hydroflow_plus::*; +use stageleft::*; + +pub fn first_ten_distributed<'a, D: Deploy<'a>>( + graph: &'a GraphBuilder<'a, D>, + node_builder: &impl NodeBuilder<'a, D>, +) -> D::Node { + let node = graph.node(node_builder); + let second_node = graph.node(node_builder); + + let numbers = node.source_iter(q!(0..10)); + numbers + .send_bincode(&second_node) + .for_each(q!(|n| println!("{}", n))); + + second_node +} + +use hydroflow_plus::util::cli::HydroCLI; +use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; + +#[stageleft::entry] +pub fn first_ten_distributed_runtime<'a>( + graph: &'a GraphBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, +) -> impl Quoted<'a, Hydroflow<'a>> { + let _ = first_ten_distributed(graph, &cli); + graph.build(q!(cli.meta.subgraph_id)) +} + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + use hydro_deploy::{Deployment, HydroflowCrate}; + use hydroflow_plus::futures::StreamExt; + use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper}; + + #[tokio::test] + async fn first_ten_distributed() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::GraphBuilder::new(); + let second_node = super::first_ten_distributed( + &builder, + &CLIDeployNodeBuilder::new(|| { + deployment.add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("first_ten_distributed") + .profile("dev"), + ) + }), + ); + + deployment.deploy().await.unwrap(); + + let second_node_stdout = second_node.stdout().await; + + deployment.start().await.unwrap(); + + assert_eq!( + second_node_stdout.take(10).collect::>().await, + vec!["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] + ); + } +} diff --git a/flow/src/lib.rs b/flow/src/lib.rs index 53693d7..8aa2800 100644 --- a/flow/src/lib.rs +++ b/flow/src/lib.rs @@ -1,103 +1,5 @@ stageleft::stageleft_crate!(flow_macro); -use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder, ClusterBuilder, HfCluster}; -use hydroflow_plus::GraphBuilder; -use stageleft::{q, Quoted, RuntimeData}; +pub mod first_ten; -use hydroflow_plus::scheduled::graph::Hydroflow; -use hydroflow_plus::util::cli::HydroCLI; -use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; - -pub fn partitioned_char_counter<'a, D: Deploy<'a>>( - graph: &'a GraphBuilder<'a, D>, - node_builder: &impl NodeBuilder<'a, D>, - cluster_builder: &impl ClusterBuilder<'a, D>, -) -> (D::Node, D::Cluster) { - let node = graph.node(node_builder); - let cluster = graph.cluster(cluster_builder); - - let words = node - .source_iter(q!(vec!["abc", "abc", "xyz"])) - .map(q!(|s| s.to_string())); - - let all_ids_vec = cluster.ids(); - let words_partitioned = words.enumerate().map(q!({ - let cluster_size = all_ids_vec.len(); - move |(i, w)| ((i % cluster_size) as u32, w) - })); - - words_partitioned - .demux_bincode(&cluster) - .batched() - .fold(q!(|| 0), q!(|count, string: String| *count += string.len())) - .inspect(q!(|count| println!("partition count: {}", count))) - .send_bincode_tagged(&node) - .persist() - .map(q!(|(_mid, count)| count)) - .fold(q!(|| 0), q!(|total, count| *total += count)) - .for_each(q!(|data| println!("total: {}", data))); - - (node, cluster) -} - -#[stageleft::entry] -pub fn partitioned_char_counter_runtime<'a>( - graph: &'a GraphBuilder<'a, CLIRuntime>, - cli: RuntimeData<&'a HydroCLI>, -) -> impl Quoted<'a, Hydroflow<'a>> { - let _ = partitioned_char_counter(graph, &cli, &cli); - graph.build(q!(cli.meta.subgraph_id)) -} - -#[stageleft::runtime] -#[cfg(test)] -mod tests { - use std::cell::RefCell; - - use hydro_deploy::{Deployment, HydroflowCrate}; - use hydroflow_plus::futures::StreamExt; - use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper, CLIDeployClusterBuilder}; - - #[tokio::test] - async fn partitioned_char_counter() { - let mut deployment = Deployment::new(); - let localhost = deployment.Localhost(); - - let deployment_cell = RefCell::new(deployment); - let builder = hydroflow_plus::GraphBuilder::new(); - let (leader, _) = super::partitioned_char_counter( - &builder, - &CLIDeployNodeBuilder::new(|| { - deployment_cell.borrow_mut().add_service( - HydroflowCrate::new(".", localhost.clone()) - .bin("my_dataflow") - .profile("dev"), - ) - }), - &CLIDeployClusterBuilder::new(|| { - (0..2).map(|_| { - deployment_cell.borrow_mut().add_service( - HydroflowCrate::new(".", localhost.clone()) - .bin("my_dataflow") - .profile("dev"), - ) - }).collect() - }), - ); - - let mut deployment = deployment_cell.into_inner(); - deployment.deploy().await.unwrap(); - - let mut leader_stdout = leader.stdout().await; - - deployment.start().await.unwrap(); - - while let Some(line) = leader_stdout.next().await { - if line == "total: 9" { - return; - } - } - - panic!("did not find total: 9"); - } -} +pub mod first_ten_distributed;