Skip to content

Commit

Permalink
example(hydroflow): shopping cart working (#690)
Browse files Browse the repository at this point in the history
Refs: #666
  • Loading branch information
jhellerstein authored and MingweiSamuel committed May 30, 2023
1 parent e1f043c commit 9029539
Show file tree
Hide file tree
Showing 23 changed files with 648 additions and 991 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/rust.git
/*.dot
__pycache__/
**/.DS_Store

# Profiling related outputs of the perf binary and cargo-flamegraph
perf.data
Expand Down
11 changes: 6 additions & 5 deletions hydroflow/examples/lamport_clock/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::protocol::EchoMsg;
use crate::{GraphType, Opts};
use std::net::SocketAddr;

use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::ord::Max;
use hydroflow::lattices::Merge;
use hydroflow::lattices::{Max, Merge};
use hydroflow::util::{UdpSink, UdpStream};
use std::net::SocketAddr;

use crate::protocol::EchoMsg;
use crate::{GraphType, Opts};

pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
// server_addr is required for client
Expand Down
3 changes: 1 addition & 2 deletions hydroflow/examples/lamport_clock/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use hydroflow::lattices::ord::Max;

use hydroflow::lattices::Max;
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
Expand Down
9 changes: 5 additions & 4 deletions hydroflow/examples/lamport_clock/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::protocol::EchoMsg;
use std::net::SocketAddr;

use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::ord::Max;
use hydroflow::lattices::Merge;
use hydroflow::lattices::{Max, Merge};
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream};
use std::net::SocketAddr;

use crate::protocol::EchoMsg;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream) {
let bot: Max<usize> = Max(0);
Expand Down
Binary file removed hydroflow/examples/shopping/.DS_Store
Binary file not shown.
24 changes: 23 additions & 1 deletion hydroflow/examples/shopping/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
To run the example:
This directory contains the code for the shopping example from the paper. It includes a driver program in `driver.rs` that generates data from 3 client "sessions" defined in `test_data.rs`. The driver process shuts itself down after a short time since this is just an example.

Code for the BP and SSIV lattices is in `lattices.rs`. Basic types for the shopping scenario are defined in `structs.rs`. The code for the various Hydroflow examples is in `flows/`.

To run the driver on an example from the paper, choose one of the following numbered options from the paper:

1. the original flow (`flows/orig_flow.rs`)
2. the bounded prefix (bp) lattice (`flows/bp_flow.rs`)
3. the sealed set of indexed values (ssiv) lattice (`flows/ssiv_flow.rs`)
4. the sealed set of indexed values (ssiv) lattice with group_by pushed through join (`flows/ssiv_flow_groupby.rs`)
5. decoupled across a network with state at the server (`flows/server_state_flow.rs`)
6. decoupled across a network with state at the client (`flows/client_state_flow.rs`)
7. decoupled across a network with state at a triply-replicated server (`flows/rep_server_flow.rs`)

Then run the driver program, passing the number of your option to the `--opt` flag. E.g:
```
cargo run -p hydroflow --example shopping -- --opt 5
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).

For options 1-4, the driver runs a single Hydro transducer (thread) that handles client requests.

For options 5-6, the driver runs two Hydro transducers, one for each side of the network communication.

For option 7, the driver runs four Hydro transducers: one client proxy and 3 server replicas.

Under all options, the driver runs an additional independent Hydro transducer (thread) to receive the output of the flow and print it to the console. The code for this transducer is in `flows/listener_flow.rs`.
213 changes: 182 additions & 31 deletions hydroflow/examples/shopping/driver.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,54 @@
use std::iter::Iterator;
use std::net::SocketAddr;

use hydroflow::util::ipv4_resolve;

use crate::flows::bp_flow::bp_flow;
use crate::flows::client_state_flow::client_state_flow;
use crate::flows::listener_flow::listener_flow;
use crate::flows::orig_flow::orig_flow;
use crate::flows::push_group_flow::push_group_flow;
use crate::flows::rep_server_flow::rep_server_flow;
use crate::flows::server_state_flow::server_state_flow;
use crate::flows::ssiv_flow::ssiv_flow;
use crate::test_data::{all_clients_iter, client100_vec, client1_vec, client2_vec};
use crate::test_data::{client100_vec, client1_vec, client2_vec};
use crate::wrappers::{bp_wrap, ssiv_wrap, tuple_wrap};
use crate::{GraphType, Opts};

use hydroflow::util::ipv4_resolve;
// spawn a listener to get the output of a flow and print it on the console
async fn spawn_listener(
tuple_listener_addr: SocketAddr,
bp_listener_addr: SocketAddr,
ssiv_listener_addr: SocketAddr,
) {
let (_, tuple_listener_in, _) = hydroflow::util::bind_udp_bytes(tuple_listener_addr).await;
let (_, bp_listener_in, _) = hydroflow::util::bind_udp_bytes(bp_listener_addr).await;
let (_, ssiv_listener_in, _) = hydroflow::util::bind_udp_bytes(ssiv_listener_addr).await;
// spawn a listener thread to print out what each flow sends over the network
std::thread::spawn(move || {
let runtime = hydroflow::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = hydroflow::tokio::task::LocalSet::new();
local.block_on(&runtime, async {
let mut hf = listener_flow(tuple_listener_in, bp_listener_in, ssiv_listener_in).await;
hf.run_async().await;
});
});
}

// driver program to demonstrate the various shopping cart implementations
pub(crate) async fn run_driver(opts: Opts) {
// port for outputting results
// XXX set up a listening thread on this port that dumps the results to stdout?
let out_addr = ipv4_resolve("localhost:23459").unwrap();
// the address for the output; the listener will listen to this address
let out_addr = ipv4_resolve("localhost:0").unwrap();
let (out, _, _) = hydroflow::util::bind_udp_bytes(out_addr).await;

// address for a separate server thread when we need one
let server_addr = ipv4_resolve("localhost:23456").unwrap();

// define the shopping workload from the test data
let shopping = tuple_wrap(all_clients_iter());
let client1 = tuple_wrap(client1_vec().into_iter());
let client2 = tuple_wrap(client2_vec().into_iter());
let client100 = tuple_wrap(client100_vec().into_iter());
let shopping = client1.chain(client2).chain(client100);

// define a BoundedPrefix version of the shopping workload from the test data.
// each client gets a separate BoundedPrefix
Expand All @@ -39,21 +64,153 @@ pub(crate) async fn run_driver(opts: Opts) {
let client100_ssiv = ssiv_wrap(client100_vec().into_iter()).map(|r| (100usize, r));
let shopping_ssiv = client1_ssiv.chain(client2_ssiv).chain(client100_ssiv);

// set up a listener to get the output of the flows and print to stdout
let tuple_listener_addr = ipv4_resolve("localhost:23470").unwrap();
let bp_listener_addr = ipv4_resolve("localhost:23471").unwrap();
let ssiv_listener_addr = ipv4_resolve("localhost:23472").unwrap();
spawn_listener(tuple_listener_addr, bp_listener_addr, ssiv_listener_addr).await;

// run the chosen dataflow
let mut hf = match opts.opt {
1 => orig_flow(shopping, out_addr, out).await,
2 => bp_flow(shopping_bp, out_addr, out).await,
3 => ssiv_flow(shopping_ssiv, out_addr, out).await,
4 => push_group_flow(shopping_ssiv, out_addr, out).await,
5 => server_state_flow(shopping_ssiv, out_addr, out, server_addr).await,
6 => client_state_flow(shopping_ssiv, out_addr, out, server_addr).await,
1 => orig_flow(shopping, tuple_listener_addr, out).await,
2 => bp_flow(shopping_bp, bp_listener_addr, out).await,
3 => ssiv_flow(shopping_ssiv, ssiv_listener_addr, out).await,
4 => push_group_flow(shopping_ssiv, ssiv_listener_addr, out).await,
opt @ (5 | 6) => {
// address for a server thread
let server_addr = ipv4_resolve("localhost:23456").unwrap();
// addresses for a client proxy thread
let client_addr = ipv4_resolve("localhost:23457").unwrap();
let client_out_addr = ipv4_resolve("localhost:23460").unwrap();
let (client_out, _, _) = hydroflow::util::bind_udp_bytes(client_out_addr).await;

// shopping input is handled by the client proxy transducer
// so the server transducer should get an empty iterator as its first argument
let empty_ssiv = std::iter::empty();

// Spawn server
std::thread::spawn(move || {
let runtime = hydroflow::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = hydroflow::tokio::task::LocalSet::new();
local.block_on(&runtime, async {
let mut hf = match opt {
5 => {
server_state_flow(
empty_ssiv,
ssiv_listener_addr,
out,
server_addr,
client_addr,
)
.await
}
6 => {
client_state_flow(
empty_ssiv,
ssiv_listener_addr,
out,
server_addr,
client_addr,
)
.await
}
_ => unreachable!(),
};
hf.run_async().await;
});
});

// Run client proxy in this thread
match opt {
5 => {
server_state_flow(
shopping_ssiv,
client_out_addr,
client_out,
client_addr,
server_addr,
)
.await
}
6 => {
client_state_flow(
shopping_ssiv,
client_out_addr,
client_out,
client_addr,
server_addr,
)
.await
}
_ => unreachable!(),
}
}
7 => {
// define the server addresses
let addr1 = ipv4_resolve("localhost:23456").unwrap();
let addr2 = ipv4_resolve("localhost:23457").unwrap();
let addr3 = ipv4_resolve("localhost:23458").unwrap();
let server_addrs = vec![addr1, addr2, addr3].into_iter();
rep_server_flow(shopping_ssiv, out_addr, out, server_addrs).await
let addr1 = ipv4_resolve("localhost:23430").unwrap();
let addr2 = ipv4_resolve("localhost:23431").unwrap();
let addr3 = ipv4_resolve("localhost:23432").unwrap();
let server_addrs = vec![addr1, addr2, addr3];

// define the server addresses for gossip
let gossip_addr1 = ipv4_resolve("localhost:23440").unwrap();
let gossip_addr2 = ipv4_resolve("localhost:23441").unwrap();
let gossip_addr3 = ipv4_resolve("localhost:23442").unwrap();
let gossip_addrs = vec![gossip_addr1, gossip_addr2, gossip_addr3];

// address for a client proxy thread
let client_addr = ipv4_resolve("localhost:23457").unwrap();
let client_out_addr = ipv4_resolve("localhost:23460").unwrap();
let (client_out, _, _) = hydroflow::util::bind_udp_bytes(client_out_addr).await;

// Spawn 3 server replicas asynchronously
for pair in server_addrs.iter().zip(gossip_addrs.iter()) {
let (&addr, &gossip_addr) = pair;

let out_addr = ipv4_resolve("localhost:0").unwrap();
let (out, _, _) = hydroflow::util::bind_udp_bytes(out_addr).await;
let gossip_addrs = gossip_addrs.clone();

// shopping input is handled by the client proxy transducer
// so the server transducers should get an empty iterator as first argument
let empty_ssiv = std::iter::empty();

// Spawn server
std::thread::spawn(move || {
let runtime = hydroflow::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = hydroflow::tokio::task::LocalSet::new();
local.block_on(&runtime, async {
let mut hf = rep_server_flow(
empty_ssiv,
ssiv_listener_addr,
out,
addr,
ssiv_listener_addr,
gossip_addr,
gossip_addrs.into_iter(),
)
.await;
hf.run_async().await;
});
});
}
// Run client proxy in this thread
rep_server_flow(
shopping_ssiv,
client_out_addr,
client_out,
client_addr,
server_addrs[0],
ipv4_resolve("localhost:23443").unwrap(),
gossip_addrs.into_iter(),
)
.await
}
_ => panic!("Invalid opt number"),
};
Expand All @@ -76,15 +233,9 @@ pub(crate) async fn run_driver(opts: Opts) {
}
}

// fire up the Hydroflow transducers, one per thread
match opts.opt {
// the first 4 options each require only a single machine representing the server
1 | 2 | 3 | 4 => hf.run_available(),
// for the next 2 options, this thread is a proxy for the client,
// and we need to spawn another thread for the server
5 | 6 => hf.run_available(),
// the next option requires 3 threads, all of which are server replicas
7 => hf.run_available(),
_ => panic!("Invalid opt number"),
};
// Run the client for 1 second; should be long enough to get all the results
let _timeout =
hydroflow::tokio::time::timeout(std::time::Duration::from_secs(1), hf.run_async())
.await
.unwrap_err();
}
21 changes: 12 additions & 9 deletions hydroflow/examples/shopping/flows/bp_flow.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::lattices::BoundedPrefix;
use crate::structs::Request;
use crate::test_data::client_class_iter;
use hydroflow::lattices::Merge;
use hydroflow::{hydroflow_syntax, scheduled::graph::Hydroflow};
use std::net::SocketAddr;

use bytes::Bytes;
use futures::stream::SplitSink;
use std::net::SocketAddr;
use tokio_util::{codec::LengthDelimitedCodec, udp::UdpFramed};
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::Merge;
use hydroflow::scheduled::graph::Hydroflow;
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::udp::UdpFramed;

use crate::lattices::BoundedPrefix;
use crate::structs::Request;
use crate::test_data::client_class_iter;

pub(crate) async fn bp_flow(
shopping_bp: impl Iterator<Item = (usize, BoundedPrefix<Request>)> + 'static,
Expand All @@ -18,7 +21,7 @@ pub(crate) async fn bp_flow(

// First define some shorthand for the merge and bot of this lattice
let bp_merge = <BoundedPrefix<Request> as Merge<BoundedPrefix<Request>>>::merge;
const bp_bot: fn() -> BoundedPrefix<Request> = Default::default;
const BP_BOT: fn() -> BoundedPrefix<Request> = Default::default;

// This is the BP case for a server with interleaved requests from clients.
// For each Request in "shopping_bp" we look up its "client_class" (basic or prime)
Expand All @@ -30,7 +33,7 @@ pub(crate) async fn bp_flow(
source_iter(client_class) -> [1]lookup_class;
lookup_class = join()
-> map(|(client, (li, class))| ((client, class), li))
-> group_by(bp_bot, bp_merge)
-> group_by(BP_BOT, bp_merge)
-> map(|m| (m, out_addr)) -> dest_sink_serde(out);
}
}
Loading

0 comments on commit 9029539

Please sign in to comment.