Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
Align template with docs
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 27, 2023
1 parent c3fde3f commit 282c9c0
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 109 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ Once the command completes, you can cd into the project and test the template.
$ cd <myproject>
$ cargo test
```

## Project Structure
Hydroflow+ uses Stageleft (https://hydro.run/docs/hydroflow_plus/stageleft), and thus requires a special project structure consisting of the main logic (in `flow`) and a auto-generated helper (in `flow_macro`). Other than keeping dependencies in sync between `flow` and `flow_macro`, you should not need to modify `flow_macro` directly.

The template includes two sample programs, `first_ten` and `first_ten_distributed`. `first_ten` demonstrates how to use Hydroflow+ to create dataflow programs for a single machine, and can be launched by running `cargo run -p flow --bin first_ten`. `first_ten_distributed` demonstrates how to use Hydroflow+ to create dataflow programs for a distributed system, and can be launched by running `cargo run -p flow --example first_ten_distributed`. Note the use of `--example` here because `src/bin/first_ten_distributed.rs` contains the binary that will be launched on each node, whereas `examples/first_ten_distributed.rs` contains a deployment script for connecting the nodes together.
26 changes: 26 additions & 0 deletions flow/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::GraphBuilder::new();
flow::first_ten_distributed::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev"),
)
}),
);

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}
4 changes: 4 additions & 0 deletions flow/src/bin/first_ten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[tokio::main]
async fn main() {
flow::first_ten::first_ten_runtime!().run_async().await;
}
9 changes: 9 additions & 0 deletions flow/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[tokio::main]
async fn main() {
let ports = hydroflow_plus::util::cli::init().await;

hydroflow_plus::util::cli::launch_flow(
flow::first_ten_distributed::first_ten_distributed_runtime!(&ports),
)
.await;
}
9 changes: 0 additions & 9 deletions flow/src/bin/my_dataflow.rs

This file was deleted.

21 changes: 21 additions & 0 deletions flow/src/first_ten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use hydroflow_plus::node::*;
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
graph: &'a GraphBuilder<'a, D>,
node_builder: &impl NodeBuilder<'a, D>,
) {
let node = graph.node(node_builder);

let numbers = node.source_iter(q!(0..10));
numbers.for_each(q!(|n| println!("{}", n)));
}

#[stageleft::entry]
pub fn first_ten_runtime<'a>(
graph: &'a GraphBuilder<'a, SingleGraph>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(graph, &());
graph.build_single()
}
67 changes: 67 additions & 0 deletions flow/src/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use hydroflow_plus::node::*;
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten_distributed<'a, D: Deploy<'a>>(
graph: &'a GraphBuilder<'a, D>,
node_builder: &impl NodeBuilder<'a, D>,
) -> D::Node {
let node = graph.node(node_builder);
let second_node = graph.node(node_builder);

let numbers = node.source_iter(q!(0..10));
numbers
.send_bincode(&second_node)
.for_each(q!(|n| println!("{}", n)));

second_node
}

use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
graph: &'a GraphBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(graph, &cli);
graph.build(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus::futures::StreamExt;
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper};

#[tokio::test]
async fn first_ten_distributed() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::GraphBuilder::new();
let second_node = super::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev"),
)
}),
);

deployment.deploy().await.unwrap();

let second_node_stdout = second_node.stdout().await;

deployment.start().await.unwrap();

assert_eq!(
second_node_stdout.take(10).collect::<Vec<_>>().await,
vec!["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
);
}
}
102 changes: 2 additions & 100 deletions flow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,5 @@
stageleft::stageleft_crate!(flow_macro);

use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder, ClusterBuilder, HfCluster};
use hydroflow_plus::GraphBuilder;
use stageleft::{q, Quoted, RuntimeData};
pub mod first_ten;

use hydroflow_plus::scheduled::graph::Hydroflow;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

pub fn partitioned_char_counter<'a, D: Deploy<'a>>(
graph: &'a GraphBuilder<'a, D>,
node_builder: &impl NodeBuilder<'a, D>,
cluster_builder: &impl ClusterBuilder<'a, D>,
) -> (D::Node, D::Cluster) {
let node = graph.node(node_builder);
let cluster = graph.cluster(cluster_builder);

let words = node
.source_iter(q!(vec!["abc", "abc", "xyz"]))
.map(q!(|s| s.to_string()));

let all_ids_vec = cluster.ids();
let words_partitioned = words.enumerate().map(q!({
let cluster_size = all_ids_vec.len();
move |(i, w)| ((i % cluster_size) as u32, w)
}));

words_partitioned
.demux_bincode(&cluster)
.batched()
.fold(q!(|| 0), q!(|count, string: String| *count += string.len()))
.inspect(q!(|count| println!("partition count: {}", count)))
.send_bincode_tagged(&node)
.persist()
.map(q!(|(_mid, count)| count))
.fold(q!(|| 0), q!(|total, count| *total += count))
.for_each(q!(|data| println!("total: {}", data)));

(node, cluster)
}

#[stageleft::entry]
pub fn partitioned_char_counter_runtime<'a>(
graph: &'a GraphBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = partitioned_char_counter(graph, &cli, &cli);
graph.build(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus::futures::StreamExt;
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper, CLIDeployClusterBuilder};

#[tokio::test]
async fn partitioned_char_counter() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let deployment_cell = RefCell::new(deployment);
let builder = hydroflow_plus::GraphBuilder::new();
let (leader, _) = super::partitioned_char_counter(
&builder,
&CLIDeployNodeBuilder::new(|| {
deployment_cell.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("my_dataflow")
.profile("dev"),
)
}),
&CLIDeployClusterBuilder::new(|| {
(0..2).map(|_| {
deployment_cell.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("my_dataflow")
.profile("dev"),
)
}).collect()
}),
);

let mut deployment = deployment_cell.into_inner();
deployment.deploy().await.unwrap();

let mut leader_stdout = leader.stdout().await;

deployment.start().await.unwrap();

while let Some(line) = leader_stdout.next().await {
if line == "total: 9" {
return;
}
}

panic!("did not find total: 9");
}
}
pub mod first_ten_distributed;

0 comments on commit 282c9c0

Please sign in to comment.