Skip to content

Commit

Permalink
feat(hydroflow): prototype a functional surface syntax using staging
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 3, 2023
1 parent fc7d27d commit addba76
Show file tree
Hide file tree
Showing 30 changed files with 1,492 additions and 43 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ members = [
"hydroflow_datalog_core",
"hydroflow_lang",
"hydroflow_macro",
"hydroflow_plus",
"hydroflow_plus_macro",
"hydroflow_plus_example_flow",
"hydroflow_plus_example_runtime",
"lattices",
"multiplatform_test",
"pusherator",
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/adjacency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) fn rga_adjacency(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
insertAfter = source_stream(input_recv) -> tee();

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/datalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
edges = source_stream(input_recv) -> tee();
insertAfter = edges -> map(|(c, p): (Token, Timestamp) | (c.ts, p)) -> tee();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/datalog_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog_agg(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
edges = source_stream(input_recv) -> tee();
insertAfter = edges -> map(|(c, p): (Token, Timestamp)| (c.ts, p)) -> tee();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_minimal(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
_list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
insertAfter = source_stream(input_recv);

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/bp_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn bp_flow(
shopping_bp: impl Iterator<Item = (usize, BoundedPrefix<Request>)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/client_state_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) async fn client_state_flow(
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/listener_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) async fn listener_flow(
tuple_input: UdpStream,
bp_input: UdpStream,
ssiv_input: UdpStream,
) -> Hydroflow {
) -> Hydroflow<'static> {
// Simply print what we receive.
hydroflow_syntax! {
source_stream_serde(tuple_input)
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/orig_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) async fn orig_flow(
shopping: impl Iterator<Item = (usize, LineItem)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// This is the straightforward single-transducer sequential case.
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/push_group_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn push_group_flow(
shopping_ssiv: impl Iterator<Item = (usize, SealedSetOfIndexedValues<Request>)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/rep_server_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn rep_server_flow(
remote_addr: SocketAddr,
gossip_addr: SocketAddr,
server_addrs: impl Iterator<Item = SocketAddr> + 'static,
) -> Hydroflow {
) -> Hydroflow<'static> {
let (broadcast_out, broadcast_in, _) = hydroflow::util::bind_udp_bytes(gossip_addr).await;
let client_class = client_class_iter();
let ssiv_merge =
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/server_state_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) async fn server_state_flow(
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/ssiv_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn ssiv_flow(
shopping_ssiv: impl Iterator<Item = (usize, SealedSetOfIndexedValues<Request>)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
22 changes: 11 additions & 11 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use super::{HandoffId, SubgraphId};
use crate::Never;

/// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs.
pub struct Hydroflow {
pub(super) subgraphs: Vec<SubgraphData>,
pub struct Hydroflow<'a> {
pub(super) subgraphs: Vec<SubgraphData<'a>>,
pub(super) context: Context,
handoffs: Vec<HandoffData>,

Expand All @@ -44,7 +44,7 @@ pub struct Hydroflow {
/// See [`Self::diagnostics()`].
diagnostics: Option<Vec<Diagnostic<SerdeSpan>>>,
}
impl Default for Hydroflow {
impl<'a> Default for Hydroflow<'a> {
fn default() -> Self {
let stratum_queues = vec![Default::default()]; // Always initialize stratum #0.
let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Default for Hydroflow {
}
}
}
impl Hydroflow {
impl<'a> Hydroflow<'a> {
/// Create a new empty Hydroflow graph.
pub fn new() -> Self {
Default::default()
Expand Down Expand Up @@ -507,7 +507,7 @@ impl Hydroflow {
Name: Into<Cow<'static, str>>,
R: 'static + PortList<RECV>,
W: 'static + PortList<SEND>,
F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),
F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),
{
let sg_id = SubgraphId(self.subgraphs.len());

Expand Down Expand Up @@ -675,7 +675,7 @@ impl Hydroflow {
}
}

impl Hydroflow {
impl<'a> Hydroflow<'a> {
/// Alias for [`Context::spawn_task`].
pub fn spawn_task<Fut>(&mut self, future: Fut)
where
Expand All @@ -695,7 +695,7 @@ impl Hydroflow {
}
}

impl Drop for Hydroflow {
impl<'a> Drop for Hydroflow<'a> {
fn drop(&mut self) {
self.abort_tasks();
}
Expand Down Expand Up @@ -740,14 +740,14 @@ impl HandoffData {
///
/// Used internally by the [Hydroflow] struct to represent the dataflow graph
/// structure and scheduled state.
pub(super) struct SubgraphData {
pub(super) struct SubgraphData<'a> {
/// A friendly name for diagnostics.
#[allow(dead_code)] // TODO(mingwei): remove attr once used.
pub(super) name: Cow<'static, str>,
/// This subgraph's stratum number.
pub(super) stratum: usize,
/// The actual execution code of the subgraph.
subgraph: Box<dyn Subgraph>,
subgraph: Box<dyn Subgraph + 'a>,
#[allow(dead_code)]
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
Expand All @@ -761,11 +761,11 @@ pub(super) struct SubgraphData {
/// Keep track of the last tick that this subgraph was run in
last_tick_run_in: Option<usize>,
}
impl SubgraphData {
impl<'a> SubgraphData<'a> {
pub fn new(
name: Cow<'static, str>,
stratum: usize,
subgraph: impl 'static + Subgraph,
subgraph: impl Subgraph + 'a,
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
is_scheduled: bool,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/graph_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub trait GraphExt {
W: 'static + Handoff + CanReceive<T>;
}

impl GraphExt for Hydroflow {
impl<'a> GraphExt for Hydroflow<'a> {
subgraph_ext!(impl add_subgraph_sink, (recv_port: R), ());
subgraph_ext!(
impl add_subgraph_2sink,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Message {
}
}

impl Hydroflow {
impl<'a> Hydroflow<'a> {
fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort<VecHandoff<Message>> {
let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/net/network_vertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub type Address = String;
// These methods can't be wrapped up in a trait because async methods are not
// allowed in traits (yet).

impl Hydroflow {
impl<'a> Hydroflow<'a> {
// TODO(justin): document these, but they're derivatives of inbound_tcp_vertex_internal.
pub async fn inbound_tcp_vertex_port<T>(&mut self, port: u16) -> RecvPort<VecHandoff<T>>
where
Expand Down
26 changes: 13 additions & 13 deletions hydroflow/src/scheduled/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ use crate::scheduled::handoff::VecHandoff;
const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff");

#[derive(Default)]
pub struct Query {
df: Rc<RefCell<Hydroflow>>,
pub struct Query<'a> {
df: Rc<RefCell<Hydroflow<'a>>>,
}

impl Query {
impl<'a> Query<'a> {
pub fn new() -> Self {
Default::default()
}

pub fn source<F, T>(&mut self, f: F) -> Operator<T>
pub fn source<F, T>(&mut self, f: F) -> Operator<'a, T>
where
T: 'static,
F: 'static + FnMut(&Context, &SendCtx<VecHandoff<T>>),
Expand All @@ -40,7 +40,7 @@ impl Query {
}
}

pub fn concat<T>(&mut self, ops: Vec<Operator<T>>) -> Operator<T>
pub fn concat<T>(&mut self, ops: Vec<Operator<T>>) -> Operator<'a, T>
where
T: 'static,
{
Expand Down Expand Up @@ -69,19 +69,19 @@ impl Query {
}
}

pub struct Operator<T>
pub struct Operator<'a, T>
where
T: 'static,
{
df: Rc<RefCell<Hydroflow>>,
df: Rc<RefCell<Hydroflow<'a>>>,
recv_port: RecvPort<VecHandoff<T>>,
}

impl<T> Operator<T>
impl<'a, T> Operator<'a, T>
where
T: 'static,
{
pub fn map<U, F>(self, mut f: F) -> Operator<U>
pub fn map<U, F>(self, mut f: F) -> Operator<'a, U>
where
F: 'static + Fn(T) -> U,
U: 'static,
Expand All @@ -101,7 +101,7 @@ where
}

#[must_use]
pub fn filter<F>(self, mut f: F) -> Operator<T>
pub fn filter<F>(self, mut f: F) -> Operator<'a, T>
where
F: 'static + Fn(&T) -> bool,
{
Expand All @@ -125,7 +125,7 @@ where
}

#[must_use]
pub fn concat(self, other: Operator<T>) -> Operator<T> {
pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> {
// TODO(justin): this is very slow.

let mut df = self.df.borrow_mut();
Expand Down Expand Up @@ -163,8 +163,8 @@ where
}
}

impl<T: Clone> Operator<T> {
pub fn tee(self, n: usize) -> Vec<Operator<T>>
impl<'a, T: Clone> Operator<'a, T> {
pub fn tee(self, n: usize) -> Vec<Operator<'a, T>>
where
T: 'static,
{
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use hydroflow_cli_integration::*;

use crate::scheduled::graph::Hydroflow;

pub async fn launch_flow(mut flow: Hydroflow) {
pub async fn launch_flow(mut flow: Hydroflow<'_>) {
let stop = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(|| {
let mut line = String::new();
Expand Down
24 changes: 24 additions & 0 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "hydroflow_plus"
publish = true
version = "0.4.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/hydroflow_plus/"
description = "Functional programming API for hydroflow"

[lib]
path = "src/lib.rs"

[features]
default = []
diagnostics = [ "hydroflow_lang/diagnostics" ]

[dependencies]
quote = "1.0.0"
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
proc-macro2 = "1.0.57"
proc-macro-crate = "1.1.0"
hydroflow = { path = "../hydroflow", version = "^0.4.0" }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.4.0" }
hydroflow_plus_macro = { path = "../hydroflow_plus_macro", version = "^0.4.0" }
Loading

0 comments on commit addba76

Please sign in to comment.