From addba764189d0d2503c40bd4617c84a02201bc8d Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 3 Oct 2023 00:45:17 -0700 Subject: [PATCH] feat(hydroflow): prototype a functional surface syntax using staging --- Cargo.lock | 40 + Cargo.toml | 4 + hydroflow/examples/rga/adjacency.rs | 2 +- hydroflow/examples/rga/datalog.rs | 2 +- hydroflow/examples/rga/datalog_agg.rs | 2 +- hydroflow/examples/rga/minimal.rs | 2 +- hydroflow/examples/shopping/flows/bp_flow.rs | 2 +- .../shopping/flows/client_state_flow.rs | 2 +- .../examples/shopping/flows/listener_flow.rs | 2 +- .../examples/shopping/flows/orig_flow.rs | 2 +- .../shopping/flows/push_group_flow.rs | 2 +- .../shopping/flows/rep_server_flow.rs | 2 +- .../shopping/flows/server_state_flow.rs | 2 +- .../examples/shopping/flows/ssiv_flow.rs | 2 +- hydroflow/src/scheduled/graph.rs | 22 +- hydroflow/src/scheduled/graph_ext.rs | 2 +- hydroflow/src/scheduled/net/mod.rs | 2 +- hydroflow/src/scheduled/net/network_vertex.rs | 2 +- hydroflow/src/scheduled/query.rs | 26 +- hydroflow/src/util/cli.rs | 2 +- hydroflow_plus/Cargo.toml | 24 + hydroflow_plus/src/lib.rs | 314 ++++++++ hydroflow_plus/src/quoting.rs | 256 +++++++ hydroflow_plus_example_flow/Cargo.toml | 13 + hydroflow_plus_example_flow/src/lib.rs | 44 ++ hydroflow_plus_example_runtime/Cargo.toml | 10 + hydroflow_plus_example_runtime/src/main.rs | 15 + hydroflow_plus_macro/Cargo.toml | 18 + hydroflow_plus_macro/src/lib.rs | 711 ++++++++++++++++++ website_playground/src/lib.rs | 6 +- 30 files changed, 1492 insertions(+), 43 deletions(-) create mode 100644 hydroflow_plus/Cargo.toml create mode 100644 hydroflow_plus/src/lib.rs create mode 100644 hydroflow_plus/src/quoting.rs create mode 100644 hydroflow_plus_example_flow/Cargo.toml create mode 100644 hydroflow_plus_example_flow/src/lib.rs create mode 100644 hydroflow_plus_example_runtime/Cargo.toml create mode 100644 hydroflow_plus_example_runtime/src/main.rs create mode 100644 hydroflow_plus_macro/Cargo.toml create mode 100644 hydroflow_plus_macro/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 650277cea8ee..c5d02b0ffd12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1461,6 +1461,46 @@ dependencies = [ "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus" +version = "0.4.0" +dependencies = [ + "hydroflow", + "hydroflow_lang", + "hydroflow_plus_macro", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", +] + +[[package]] +name = "hydroflow_plus_example_flow" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "regex", +] + +[[package]] +name = "hydroflow_plus_example_runtime" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_example_flow", + "regex", +] + +[[package]] +name = "hydroflow_plus_macro" +version = "0.4.0" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", +] + [[package]] name = "iana-time-zone" version = "0.1.56" diff --git a/Cargo.toml b/Cargo.toml index 0929d37cd473..74f35f12435c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,10 @@ members = [ "hydroflow_datalog_core", "hydroflow_lang", "hydroflow_macro", + "hydroflow_plus", + "hydroflow_plus_macro", + "hydroflow_plus_example_flow", + "hydroflow_plus_example_runtime", "lattices", "multiplatform_test", "pusherator", diff --git a/hydroflow/examples/rga/adjacency.rs b/hydroflow/examples/rga/adjacency.rs index 546e0b524ce5..5f8d0e74a4ae 100644 --- a/hydroflow/examples/rga/adjacency.rs +++ b/hydroflow/examples/rga/adjacency.rs @@ -11,7 +11,7 @@ pub(crate) fn rga_adjacency( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv) -> tee(); diff --git a/hydroflow/examples/rga/datalog.rs b/hydroflow/examples/rga/datalog.rs index 362d9b7ccc79..787a6df93f1a 100644 --- a/hydroflow/examples/rga/datalog.rs +++ b/hydroflow/examples/rga/datalog.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp) | (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/datalog_agg.rs b/hydroflow/examples/rga/datalog_agg.rs index 4739012c08c8..6b7109e03deb 100644 --- a/hydroflow/examples/rga/datalog_agg.rs +++ b/hydroflow/examples/rga/datalog_agg.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog_agg( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp)| (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/minimal.rs b/hydroflow/examples/rga/minimal.rs index e78032a1bfff..23b3583fa5c1 100644 --- a/hydroflow/examples/rga/minimal.rs +++ b/hydroflow/examples/rga/minimal.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_minimal( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, _list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv); diff --git a/hydroflow/examples/shopping/flows/bp_flow.rs b/hydroflow/examples/shopping/flows/bp_flow.rs index 93e8e8a6f08f..960dfffca803 100644 --- a/hydroflow/examples/shopping/flows/bp_flow.rs +++ b/hydroflow/examples/shopping/flows/bp_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn bp_flow( shopping_bp: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/client_state_flow.rs b/hydroflow/examples/shopping/flows/client_state_flow.rs index 0d12ed44ed49..8d4cdb511b91 100644 --- a/hydroflow/examples/shopping/flows/client_state_flow.rs +++ b/hydroflow/examples/shopping/flows/client_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn client_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/listener_flow.rs b/hydroflow/examples/shopping/flows/listener_flow.rs index ff4114bbee64..0ab38af0b8cc 100644 --- a/hydroflow/examples/shopping/flows/listener_flow.rs +++ b/hydroflow/examples/shopping/flows/listener_flow.rs @@ -11,7 +11,7 @@ pub(crate) async fn listener_flow( tuple_input: UdpStream, bp_input: UdpStream, ssiv_input: UdpStream, -) -> Hydroflow { +) -> Hydroflow<'static> { // Simply print what we receive. hydroflow_syntax! { source_stream_serde(tuple_input) diff --git a/hydroflow/examples/shopping/flows/orig_flow.rs b/hydroflow/examples/shopping/flows/orig_flow.rs index d5803399619a..c92f080c8138 100644 --- a/hydroflow/examples/shopping/flows/orig_flow.rs +++ b/hydroflow/examples/shopping/flows/orig_flow.rs @@ -14,7 +14,7 @@ pub(crate) async fn orig_flow( shopping: impl Iterator + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // This is the straightforward single-transducer sequential case. diff --git a/hydroflow/examples/shopping/flows/push_group_flow.rs b/hydroflow/examples/shopping/flows/push_group_flow.rs index 4f22e3a0c96c..210f567a8ddc 100644 --- a/hydroflow/examples/shopping/flows/push_group_flow.rs +++ b/hydroflow/examples/shopping/flows/push_group_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn push_group_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/rep_server_flow.rs b/hydroflow/examples/shopping/flows/rep_server_flow.rs index 26b8e446e11d..8b62825b7a6b 100644 --- a/hydroflow/examples/shopping/flows/rep_server_flow.rs +++ b/hydroflow/examples/shopping/flows/rep_server_flow.rs @@ -20,7 +20,7 @@ pub(crate) async fn rep_server_flow( remote_addr: SocketAddr, gossip_addr: SocketAddr, server_addrs: impl Iterator + 'static, -) -> Hydroflow { +) -> Hydroflow<'static> { let (broadcast_out, broadcast_in, _) = hydroflow::util::bind_udp_bytes(gossip_addr).await; let client_class = client_class_iter(); let ssiv_merge = diff --git a/hydroflow/examples/shopping/flows/server_state_flow.rs b/hydroflow/examples/shopping/flows/server_state_flow.rs index d20416b654de..9b1bcde86405 100644 --- a/hydroflow/examples/shopping/flows/server_state_flow.rs +++ b/hydroflow/examples/shopping/flows/server_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn server_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/ssiv_flow.rs b/hydroflow/examples/shopping/flows/ssiv_flow.rs index 154d224c2cc2..34ddfefc5163 100644 --- a/hydroflow/examples/shopping/flows/ssiv_flow.rs +++ b/hydroflow/examples/shopping/flows/ssiv_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn ssiv_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 47afe68eb0db..e4745c1a1951 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -24,8 +24,8 @@ use super::{HandoffId, SubgraphId}; use crate::Never; /// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs. -pub struct Hydroflow { - pub(super) subgraphs: Vec, +pub struct Hydroflow<'a> { + pub(super) subgraphs: Vec>, pub(super) context: Context, handoffs: Vec, @@ -44,7 +44,7 @@ pub struct Hydroflow { /// See [`Self::diagnostics()`]. diagnostics: Option>>, } -impl Default for Hydroflow { +impl<'a> Default for Hydroflow<'a> { fn default() -> Self { let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl Default for Hydroflow { } } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Create a new empty Hydroflow graph. pub fn new() -> Self { Default::default() @@ -507,7 +507,7 @@ impl Hydroflow { Name: Into>, R: 'static + PortList, W: 'static + PortList, - F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), + F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), { let sg_id = SubgraphId(self.subgraphs.len()); @@ -675,7 +675,7 @@ impl Hydroflow { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Alias for [`Context::spawn_task`]. pub fn spawn_task(&mut self, future: Fut) where @@ -695,7 +695,7 @@ impl Hydroflow { } } -impl Drop for Hydroflow { +impl<'a> Drop for Hydroflow<'a> { fn drop(&mut self) { self.abort_tasks(); } @@ -740,14 +740,14 @@ impl HandoffData { /// /// Used internally by the [Hydroflow] struct to represent the dataflow graph /// structure and scheduled state. -pub(super) struct SubgraphData { +pub(super) struct SubgraphData<'a> { /// A friendly name for diagnostics. #[allow(dead_code)] // TODO(mingwei): remove attr once used. pub(super) name: Cow<'static, str>, /// This subgraph's stratum number. pub(super) stratum: usize, /// The actual execution code of the subgraph. - subgraph: Box, + subgraph: Box, #[allow(dead_code)] preds: Vec, succs: Vec, @@ -761,11 +761,11 @@ pub(super) struct SubgraphData { /// Keep track of the last tick that this subgraph was run in last_tick_run_in: Option, } -impl SubgraphData { +impl<'a> SubgraphData<'a> { pub fn new( name: Cow<'static, str>, stratum: usize, - subgraph: impl 'static + Subgraph, + subgraph: impl Subgraph + 'a, preds: Vec, succs: Vec, is_scheduled: bool, diff --git a/hydroflow/src/scheduled/graph_ext.rs b/hydroflow/src/scheduled/graph_ext.rs index db8b48726e0e..f1245516a153 100644 --- a/hydroflow/src/scheduled/graph_ext.rs +++ b/hydroflow/src/scheduled/graph_ext.rs @@ -125,7 +125,7 @@ pub trait GraphExt { W: 'static + Handoff + CanReceive; } -impl GraphExt for Hydroflow { +impl<'a> GraphExt for Hydroflow<'a> { subgraph_ext!(impl add_subgraph_sink, (recv_port: R), ()); subgraph_ext!( impl add_subgraph_2sink, diff --git a/hydroflow/src/scheduled/net/mod.rs b/hydroflow/src/scheduled/net/mod.rs index ae1fbde936fc..07309c81446f 100644 --- a/hydroflow/src/scheduled/net/mod.rs +++ b/hydroflow/src/scheduled/net/mod.rs @@ -96,7 +96,7 @@ impl Message { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort> { let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let (send_port, recv_port) = self.make_edge("tcp ingress handoff"); diff --git a/hydroflow/src/scheduled/net/network_vertex.rs b/hydroflow/src/scheduled/net/network_vertex.rs index 508437a2df51..13077e6290e8 100644 --- a/hydroflow/src/scheduled/net/network_vertex.rs +++ b/hydroflow/src/scheduled/net/network_vertex.rs @@ -18,7 +18,7 @@ pub type Address = String; // These methods can't be wrapped up in a trait because async methods are not // allowed in traits (yet). -impl Hydroflow { +impl<'a> Hydroflow<'a> { // TODO(justin): document these, but they're derivatives of inbound_tcp_vertex_internal. pub async fn inbound_tcp_vertex_port(&mut self, port: u16) -> RecvPort> where diff --git a/hydroflow/src/scheduled/query.rs b/hydroflow/src/scheduled/query.rs index 04da7d34704a..af9b0d6cc4c0 100644 --- a/hydroflow/src/scheduled/query.rs +++ b/hydroflow/src/scheduled/query.rs @@ -15,16 +15,16 @@ use crate::scheduled::handoff::VecHandoff; const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff"); #[derive(Default)] -pub struct Query { - df: Rc>, +pub struct Query<'a> { + df: Rc>>, } -impl Query { +impl<'a> Query<'a> { pub fn new() -> Self { Default::default() } - pub fn source(&mut self, f: F) -> Operator + pub fn source(&mut self, f: F) -> Operator<'a, T> where T: 'static, F: 'static + FnMut(&Context, &SendCtx>), @@ -40,7 +40,7 @@ impl Query { } } - pub fn concat(&mut self, ops: Vec>) -> Operator + pub fn concat(&mut self, ops: Vec>) -> Operator<'a, T> where T: 'static, { @@ -69,19 +69,19 @@ impl Query { } } -pub struct Operator +pub struct Operator<'a, T> where T: 'static, { - df: Rc>, + df: Rc>>, recv_port: RecvPort>, } -impl Operator +impl<'a, T> Operator<'a, T> where T: 'static, { - pub fn map(self, mut f: F) -> Operator + pub fn map(self, mut f: F) -> Operator<'a, U> where F: 'static + Fn(T) -> U, U: 'static, @@ -101,7 +101,7 @@ where } #[must_use] - pub fn filter(self, mut f: F) -> Operator + pub fn filter(self, mut f: F) -> Operator<'a, T> where F: 'static + Fn(&T) -> bool, { @@ -125,7 +125,7 @@ where } #[must_use] - pub fn concat(self, other: Operator) -> Operator { + pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> { // TODO(justin): this is very slow. let mut df = self.df.borrow_mut(); @@ -163,8 +163,8 @@ where } } -impl Operator { - pub fn tee(self, n: usize) -> Vec> +impl<'a, T: Clone> Operator<'a, T> { + pub fn tee(self, n: usize) -> Vec> where T: 'static, { diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 90928acd8a00..c5f359fa0af6 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -6,7 +6,7 @@ pub use hydroflow_cli_integration::*; use crate::scheduled::graph::Hydroflow; -pub async fn launch_flow(mut flow: Hydroflow) { +pub async fn launch_flow(mut flow: Hydroflow<'_>) { let stop = tokio::sync::oneshot::channel(); tokio::task::spawn_blocking(|| { let mut line = String::new(); diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml new file mode 100644 index 000000000000..7759ca522de6 --- /dev/null +++ b/hydroflow_plus/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "hydroflow_plus" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus/" +description = "Functional programming API for hydroflow" + +[lib] +path = "src/lib.rs" + +[features] +default = [] +diagnostics = [ "hydroflow_lang/diagnostics" ] + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +hydroflow = { path = "../hydroflow", version = "^0.4.0" } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.4.0" } +hydroflow_plus_macro = { path = "../hydroflow_plus_macro", version = "^0.4.0" } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs new file mode 100644 index 000000000000..3a4adf99496b --- /dev/null +++ b/hydroflow_plus/src/lib.rs @@ -0,0 +1,314 @@ +use std::cell::RefCell; +use std::marker::PhantomData; + +use hydroflow::futures::stream::Stream; +use hydroflow::scheduled::context::Context; +use hydroflow_lang::graph::{partition_graph, propegate_flow_props, FlatGraphBuilder}; +use proc_macro2::{Span, TokenStream}; +use quote::quote; +use quoting::{FreeVariable, IntoQuotedMut, IntoQuotedOnce, ToFreeVariableTokens}; +use syn::parse_quote; + +pub mod internal { + pub use proc_macro2::TokenStream; + pub use quote::quote; + pub use syn; +} + +pub mod quoting; + +pub use hydroflow; +pub use hydroflow_plus_macro::{flow, q, qtype, quse, quse_type}; +pub use quoting::RuntimeData; + +thread_local! { + static HYDROFLOW_NEXT_ID: RefCell = RefCell::new(0); + static HYDROFLOW_BUILDER: RefCell> = RefCell::new(None); +} + +pub fn hydroflow_build(f: impl Fn()) -> TokenStream { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident::hydroflow } + } + }; + + HYDROFLOW_NEXT_ID.with(|next_id| { + *next_id.borrow_mut() = 0; + HYDROFLOW_BUILDER.with(|builder| { + *builder.borrow_mut() = Some(FlatGraphBuilder::new()); + f(); + + let (flat_graph, _, _) = builder.borrow_mut().take().unwrap().build(); + let mut partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let mut diagnostics = Vec::new(); + // Propgeate flow properties throughout the graph. + // TODO(mingwei): Should this be done at a flat graph stage instead? + let _ = propegate_flow_props::propegate_flow_props( + &mut partitioned_graph, + &mut diagnostics, + ); + + partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics) + }) + }) +} + +#[derive(Clone)] +pub struct RuntimeContext<'a> { + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl Copy for RuntimeContext<'_> {} + +impl<'a> ToFreeVariableTokens for RuntimeContext<'a> { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(&context))) + } +} + +impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> {} + +pub struct HfGraph<'a> { + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> HfGraph<'a> { + #[allow(clippy::new_without_default)] + pub fn new() -> HfGraph<'a> { + HfGraph { + _phantom: PhantomData, + } + } + + pub fn runtime_context(&self) -> RuntimeContext<'a> { + RuntimeContext { + _phantom: PhantomData, + } + } + + pub fn source_stream + Unpin>( + &self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfNode<'a, T> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_stream(#e) -> tee(); + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } + + pub fn source_iter>( + &self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfNode<'a, T> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } +} + +pub struct HfNode<'a, T> { + ident: syn::Ident, + _phantom: PhantomData<&'a mut &'a T>, +} + +impl<'a, T> HfNode<'a, T> { + pub fn source_iter>(e: impl IntoQuotedOnce<'a, E>) -> HfNode<'a, T> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } + + pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfNode<'a, U> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("map_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> map(#f) -> tee(); + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } + + pub fn filter bool + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfNode<'a, T> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("filter_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> filter(#f) -> tee(); + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } + + pub fn for_each(&self, f: impl IntoQuotedMut<'a, F>) { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> for_each(#f); + }); + }); + } +} + +impl<'a, K, V1> HfNode<'a, (K, V1)> { + pub fn join(&self, n: &HfNode<(K, V2)>) -> HfNode<(K, (V1, V2))> { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let other_ident = &n.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = join() -> tee(); + }); + + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #self_ident -> [0]#ident; + }); + + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #other_ident -> [1]#ident; + }); + }); + + HfNode { + ident, + _phantom: PhantomData, + } + } +} diff --git a/hydroflow_plus/src/quoting.rs b/hydroflow_plus/src/quoting.rs new file mode 100644 index 000000000000..89d1ad93a534 --- /dev/null +++ b/hydroflow_plus/src/quoting.rs @@ -0,0 +1,256 @@ +use std::marker::PhantomData; +use std::mem::MaybeUninit; + +use proc_macro2::{Span, TokenStream}; +use quote::{quote, ToTokens}; + +type FreeVariables = Vec<(String, (Option, Option))>; + +pub trait IntoQuotedOnce<'a, T>: FnOnce(&mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ + fn to_quoted(self) -> QuotedExpr { + let mut str = String::new(); + let mut free_variables = Vec::new(); + // this is an uninit value so we can't drop it + std::mem::forget(self(&mut str, &mut free_variables, false)); + QuotedExpr::create(&str, free_variables) + } +} + +impl<'a, T, F: FnOnce(&mut String, &mut FreeVariables, bool) -> T + 'a> IntoQuotedOnce<'a, T> + for F +{ +} + +pub trait IntoQuotedMut<'a, T>: FnMut(&mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ + fn to_quoted(mut self) -> QuotedExpr { + let mut str = String::new(); + let mut free_variables = Vec::new(); + // this is an uninit value so we can't drop it + std::mem::forget(self(&mut str, &mut free_variables, false)); + QuotedExpr::create(&str, free_variables) + } +} + +impl<'a, T, F: FnMut(&mut String, &mut FreeVariables, bool) -> T + 'a> IntoQuotedMut<'a, T> for F {} + +pub struct QuotedExpr { + expr: syn::Expr, + free_variables: FreeVariables, + _phantom: PhantomData, +} + +impl QuotedExpr { + pub fn create(expr: &str, free_variables: FreeVariables) -> QuotedExpr { + let expr = syn::parse_str(expr).unwrap(); + QuotedExpr { + expr, + free_variables, + _phantom: PhantomData, + } + } +} + +impl ToTokens for QuotedExpr { + fn to_tokens(&self, tokens: &mut TokenStream) { + let instantiated_free_variables = self.free_variables.iter().flat_map(|(ident, value)| { + let ident = syn::Ident::new(ident, Span::call_site()); + value.0.iter().map(|prelude| quote!(#prelude)).chain( + value + .1 + .iter() + .map(move |value| quote!(let #ident = #value;)), + ) + }); + + let expr = &self.expr; + tokens.extend(quote!({ + #(#instantiated_free_variables)* + #expr + })); + } +} + +pub trait ParseFromLiteral { + fn parse_from_literal(literal: &syn::Expr) -> Self; +} + +impl ParseFromLiteral for u32 { + fn parse_from_literal(literal: &syn::Expr) -> Self { + match literal { + syn::Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit_int), + .. + }) => lit_int.base10_parse().unwrap(), + _ => panic!("Expected literal"), + } + } +} + +pub trait ToFreeVariableTokens { + fn to_tokens(&self) -> (Option, Option); +} + +pub trait ToGlobalFreeVariableTokens { + fn to_tokens(&self) -> (Option, Option); +} + +impl T> ToFreeVariableTokens for F { + fn to_tokens(&self) -> (Option, Option) { + let value = self(); + value.to_tokens() + } +} + +impl T> FreeVariable<()> for F {} + +pub trait FreeVariable +where + Self: Sized, +{ + fn uninitialized(self) -> O { + #[allow(clippy::uninit_assumed_init)] + unsafe { + MaybeUninit::uninit().assume_init() + } + } +} + +impl ToFreeVariableTokens for u32 { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + +impl FreeVariable for u32 {} + +impl ToGlobalFreeVariableTokens for u32 { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + +pub struct RuntimeData { + ident: &'static str, + _phantom: PhantomData, +} + +impl Copy for RuntimeData {} + +impl Clone for RuntimeData { + fn clone(&self) -> Self { + RuntimeData { + ident: self.ident.clone(), + _phantom: PhantomData, + } + } +} + +impl RuntimeData { + pub fn new(ident: &'static str) -> RuntimeData { + RuntimeData { + ident, + _phantom: PhantomData, + } + } +} + +impl ToFreeVariableTokens for RuntimeData { + fn to_tokens(&self) -> (Option, Option) { + let ident = syn::Ident::new(self.ident, Span::call_site()); + (None, Some(quote!(#ident))) + } +} + +impl FreeVariable for RuntimeData {} + +pub struct Import { + parent: Option

, + path: &'static str, + _phantom: PhantomData, +} + +impl Copy for Import {} +impl Clone for Import { + fn clone(&self) -> Self { + Import { + parent: self.parent, + path: self.path, + _phantom: PhantomData, + } + } +} + +pub fn create_import(path: &'static str, _unused_type_check: T) -> Import { + Import { + parent: None, + path, + _phantom: PhantomData, + } +} + +impl Import { + pub fn extend( + &self, + path: &'static str, + _unused_type_check: T2, + ) -> Import> { + Import { + parent: Some(*self), + path, + _phantom: PhantomData, + } + } +} + +impl ToFreeVariableTokens for Import { + fn to_tokens(&self) -> (Option, Option) { + if let Some(parent) = &self.parent { + let (prelude, value) = parent.to_tokens(); + let parsed = syn::parse_str::(self.path).unwrap(); + (prelude, Some(quote!(#value::#parsed))) + } else { + let parsed = syn::parse_str::(self.path).unwrap(); + (Some(quote!(use ::#parsed;)), None) + } + } +} + +impl FreeVariable for Import {} + +impl ToGlobalFreeVariableTokens for Import { + fn to_tokens(&self) -> (Option, Option) { + if let Some(parent) = &self.parent { + let (prelude, value) = parent.to_tokens(); + let parsed = syn::parse_str::(self.path).unwrap(); + (prelude, Some(quote!(#value::#parsed))) + } else { + let parsed = syn::parse_str::(self.path).unwrap(); + (Some(quote!(use ::#parsed;)), None) + } + } +} + +pub struct Type { + definition: String, +} + +impl Type { + pub fn new(def: &str) -> Type { + Type { + definition: def.to_string(), + } + } +} + +impl ToGlobalFreeVariableTokens for Type { + fn to_tokens(&self) -> (Option, Option) { + let parsed: syn::Item = syn::parse_str(&self.definition).unwrap(); + (Some(quote!(#parsed)), None) + } +} diff --git a/hydroflow_plus_example_flow/Cargo.toml b/hydroflow_plus_example_flow/Cargo.toml new file mode 100644 index 000000000000..0460bfd97803 --- /dev/null +++ b/hydroflow_plus_example_flow/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "hydroflow_plus_example_flow" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +regex = "1" diff --git a/hydroflow_plus_example_flow/src/lib.rs b/hydroflow_plus_example_flow/src/lib.rs new file mode 100644 index 000000000000..a6f52e1ff1eb --- /dev/null +++ b/hydroflow_plus_example_flow/src/lib.rs @@ -0,0 +1,44 @@ +use hydroflow_plus::*; + +quse_type!(::regex::Regex); +quse_type!(::hydroflow_plus::hydroflow::tokio_stream::wrappers::UnboundedReceiverStream); +qtype! { + struct Test { + pub v: String + } +} + +fn filter_by_regex<'a, S: Copy + AsRef + 'a>( + graph: &HfGraph<'a>, + input: HfNode<'a, String>, + pattern: RuntimeData, +) -> HfNode<'a, String> { + let ctx = graph.runtime_context(); + + input.filter(q!({ + let regex = Regex::new(pattern.as_ref()).unwrap(); + move |x| { + dbg!(ctx.current_tick()); + let constructed_test = Test { v: x.clone() }; + dbg!(constructed_test.v); + regex.is_match(x) + } + })) +} + +#[hydroflow_plus::flow(&'static str)] +pub fn my_example_flow<'a, S: Copy + AsRef + 'a>( + graph: &HfGraph<'a>, + input_stream: RuntimeData>, + number_of_foreach: u32, + regex: RuntimeData, + text: RuntimeData<&'a str>, +) { + let source = graph.source_stream(q!(input_stream)); + + let mapped = filter_by_regex(graph, source, regex); + + for _ in 0..number_of_foreach { + mapped.for_each(q!(move |x| println!("passed regex {} {}", text, x))); + } +} diff --git a/hydroflow_plus_example_runtime/Cargo.toml b/hydroflow_plus_example_runtime/Cargo.toml new file mode 100644 index 000000000000..e5055577735e --- /dev/null +++ b/hydroflow_plus_example_runtime/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "hydroflow_plus_example_runtime" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +hydroflow_plus_example_flow = { path = "../hydroflow_plus_example_flow" } +regex = "1" diff --git a/hydroflow_plus_example_runtime/src/main.rs b/hydroflow_plus_example_runtime/src/main.rs new file mode 100644 index 000000000000..04a629714dce --- /dev/null +++ b/hydroflow_plus_example_runtime/src/main.rs @@ -0,0 +1,15 @@ +use hydroflow_plus_example_flow::my_example_flow; + +fn main() { + let (send, recv) = hydroflow_plus::hydroflow::util::unbounded_channel::(); + let regex = std::env::args() + .nth(1) + .expect("Expected regex as first argument") + .parse::() + .expect("Expected regex to be a string"); + let test_string = "test".to_string(); + let mut flow = my_example_flow!(recv, 1, ®ex, &test_string); + send.send("abc".to_string()).unwrap(); + send.send("def".to_string()).unwrap(); + flow.run_tick(); +} diff --git a/hydroflow_plus_macro/Cargo.toml b/hydroflow_plus_macro/Cargo.toml new file mode 100644 index 000000000000..f7daf190f8b4 --- /dev/null +++ b/hydroflow_plus_macro/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "hydroflow_plus_macro" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus_macro/" +description = "Helper macros for the hydroflow_plus crate" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" diff --git a/hydroflow_plus_macro/src/lib.rs b/hydroflow_plus_macro/src/lib.rs new file mode 100644 index 000000000000..c490b4940802 --- /dev/null +++ b/hydroflow_plus_macro/src/lib.rs @@ -0,0 +1,711 @@ +use std::collections::HashSet; + +use proc_macro2::{Punct, Spacing, Span, TokenStream}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::punctuated::Punctuated; +use syn::spanned::Spanned; +use syn::visit::Visit; +use syn::{ + parse_macro_input, AngleBracketedGenericArguments, GenericParam, MacroDelimiter, PathArguments, + Token, Type, +}; + +#[derive(Debug)] +struct ScopeStack { + scopes: Vec<(HashSet, HashSet)>, +} + +impl ScopeStack { + fn new() -> ScopeStack { + ScopeStack { + scopes: vec![(HashSet::new(), HashSet::new())], + } + } + + fn push(&mut self) { + self.scopes.push((HashSet::new(), HashSet::new())); + } + + fn pop(&mut self) { + self.scopes.pop(); + } + + fn insert_term(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .0 + .insert(ident.to_string()); + } + + fn insert_type(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .1 + .insert(ident.to_string()); + } + + fn contains_term(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.0.contains(&ident)) + } + + fn contains_type(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.1.contains(&ident)) + } +} + +struct FreeVariableVisitor { + free_variables: Vec, + current_scope: ScopeStack, +} + +fn is_prelude(ident: &syn::Ident) -> bool { + let ident_str = ident.to_string(); + let prelude = vec![ + "str", + "i8", + "u8", + "i16", + "u16", + "i32", + "u32", + "i64", + "u64", + "i128", + "u128", + "isize", + "usize", + "Copy", + "Send", + "Sized", + "Sync", + "Unpin", + "Drop", + "Fn", + "FnMut", + "FnOnce", + "drop", + "Box", + "ToOwned", + "Clone", + "PartialEq", + "PartialOrd", + "Eq", + "Ord", + "AsRef", + "AsMut", + "Into", + "From", + "Default", + "Iterator", + "Extend", + "IntoIterator", + "DoubleEndedIterator", + "ExactSizeIterator", + "Option", + "Some", + "None", + "Result", + "Ok", + "Err", + "String", + "ToString", + "Vec", + ] + .into_iter() + .collect::>(); + + prelude.contains(&ident_str.as_str()) +} + +impl<'ast> Visit<'ast> for FreeVariableVisitor { + fn visit_expr_closure(&mut self, i: &'ast syn::ExprClosure) { + self.current_scope.push(); + i.inputs.iter().for_each(|input| match input { + syn::Pat::Ident(pat_ident) => self.current_scope.insert_term(pat_ident.ident.clone()), + syn::Pat::Type(pat_type) => match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()) + } + _ => panic!("Closure parameters must be identifiers"), + }, + _ => panic!("Closure parameters must be identifiers"), + }); + + syn::visit::visit_expr_closure(self, i); + + self.current_scope.pop(); + } + + fn visit_item_fn(&mut self, i: &'ast syn::ItemFn) { + self.current_scope.push(); + syn::visit::visit_item_fn(self, i); + self.current_scope.pop(); + } + + fn visit_generic_param(&mut self, i: &'ast GenericParam) { + match i { + syn::GenericParam::Type(type_param) => { + self.current_scope.insert_type(type_param.ident.clone()); + } + syn::GenericParam::Lifetime(lifetime_param) => { + self.current_scope + .insert_type(lifetime_param.lifetime.ident.clone()); + } + syn::GenericParam::Const(const_param) => { + self.current_scope.insert_type(const_param.ident.clone()); + } + } + } + + fn visit_block(&mut self, i: &'ast syn::Block) { + self.current_scope.push(); + syn::visit::visit_block(self, i); + self.current_scope.pop(); + } + + fn visit_local(&mut self, i: &'ast syn::Local) { + i.init.iter().for_each(|init| { + syn::visit::visit_local_init(self, init); + }); + + match &i.pat { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()); + } + _ => panic!("Local variables must be identifiers"), + } + } + + fn visit_ident(&mut self, i: &'ast proc_macro2::Ident) { + if !self.current_scope.contains_term(i) { + self.free_variables.push(i.clone()); + } + } + + fn visit_lifetime(&mut self, i: &'ast syn::Lifetime) { + if !self.current_scope.contains_type(&i.ident) { + self.free_variables.push(i.ident.clone()); + } + } + + fn visit_expr_path(&mut self, i: &'ast syn::ExprPath) { + if i.path.leading_colon.is_none() && !is_prelude(&i.path.segments.first().unwrap().ident) { + let node = i.path.segments.first().unwrap(); + if i.path.segments.len() == 1 { + if !self.current_scope.contains_term(&node.ident) { + self.free_variables.push(node.ident.clone()); + } + } else if !self.current_scope.contains_type(&node.ident) { + self.free_variables.push(node.ident.clone()); + } + + self.visit_path_arguments(&node.arguments); + } + } + + fn visit_type_path(&mut self, i: &'ast syn::TypePath) { + if i.path.leading_colon.is_none() && !is_prelude(&i.path.segments.first().unwrap().ident) { + let node = i.path.segments.first().unwrap(); + if !self.current_scope.contains_type(&node.ident) { + self.free_variables.push(node.ident.clone()); + } + + self.visit_path_arguments(&node.arguments); + } + } + + fn visit_expr_method_call(&mut self, i: &'ast syn::ExprMethodCall) { + syn::visit::visit_expr(self, &i.receiver); + } + + fn visit_expr_struct(&mut self, node: &'ast syn::ExprStruct) { + for it in &node.attrs { + self.visit_attribute(it); + } + if let Some(it) = &node.qself { + self.visit_qself(it); + } + self.visit_path(&node.path); + for el in Punctuated::pairs(&node.fields) { + let it = el.value(); + self.visit_expr(&it.expr); + } + if let Some(it) = &node.rest { + self.visit_expr(it); + } + } + + fn visit_expr_field(&mut self, i: &'ast syn::ExprField) { + self.visit_expr(&i.base); + } + + fn visit_macro(&mut self, i: &'ast syn::Macro) { + // TODO(shadaj): emit a warning if our guess at parsing fails + match i.delimiter { + MacroDelimiter::Paren(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + MacroDelimiter::Brace(_binding_0) => i + .parse_body_with(syn::Block::parse_within) + .ok() + .iter() + .flatten() + .for_each(|stmt| { + self.visit_stmt(stmt); + }), + MacroDelimiter::Bracket(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + } + } +} + +#[proc_macro] +pub fn q(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let expr = syn::parse_macro_input!(input as syn::Expr); + let mut visitor = FreeVariableVisitor { + free_variables: Vec::new(), + current_scope: ScopeStack::new(), + }; + visitor.visit_expr(&expr); + + let free_variables = visitor.free_variables.iter().map(|i| { + let ident = i.clone(); + let ident_str = ident.to_string(); + quote!((#ident_str.to_string(), ::#root::quoting::ToFreeVariableTokens::to_tokens(&#ident))) + }); + + let cloned_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(non_upper_case_globals, non_snake_case)] + let #i_without_span = #i_without_span; + ) + }); + + let unitialized_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(unused, non_upper_case_globals, non_snake_case)] + let #i = ::#root::quoting::FreeVariable::uninitialized(#i_without_span) + ) + }); + + let free_variables_vec = quote!(vec![#(#free_variables),*]); + + let expr_string = expr.clone().into_token_stream().to_string(); + proc_macro::TokenStream::from(quote!({ + #(#cloned_free_variables;)* + move |set_str: &mut String, set_vec: &mut Vec<(String, (Option<#root::internal::TokenStream>, Option<#root::internal::TokenStream>))>, run: bool| { + *set_str = #expr_string.to_string(); + *set_vec = #free_variables_vec; + + if !run { + unsafe { + return ::std::mem::MaybeUninit::uninit().assume_init(); + } + } + + #[allow(unreachable_code)] + { + #(#unitialized_free_variables;)* + #expr + } + } + })) +} + +fn gen_use_paths( + root: TokenStream, + is_rooted: bool, + mut prefix: Vec, + tree: &syn::UseTree, + global: bool, + into: &mut Vec, +) { + match &tree { + syn::UseTree::Path(path) => { + prefix.push(path.ident.clone()); + gen_use_paths(root, is_rooted, prefix, &path.tree, global, into); + } + syn::UseTree::Name(name) => { + let name_ident = name.ident.clone(); + let mut name_ident_unspanned = name_ident.clone(); + name_ident_unspanned.set_span(Span::call_site()); + let prefix_unspanned = prefix + .iter() + .map(|i| { + let mut i = i.clone(); + i.set_span(Span::call_site()); + i + }) + .collect::>(); + + if is_rooted { + let full_path = quote!(#(#prefix::)*#name_ident).to_string(); + + if global { + into.push(quote! { + use ::#(#prefix_unspanned::)*#name_ident_unspanned; + #[allow(non_upper_case_globals, non_snake_case)] + fn #name_ident() -> #root::quoting::Import<(), u32> { + #root::quoting::create_import( + #full_path, + { + let __quse_local = (); + { + use ::#(#prefix::)*#name_ident as __quse_local; + __quse_local + } + } + ) + } + }); + } else { + into.push(quote! { + use ::#(#prefix_unspanned::)*#name_ident_unspanned; + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident = #root::quoting::create_import( + #full_path, + { + let __quse_local = (); + { + use ::#(#prefix::)*#name_ident as __quse_local; + __quse_local + } + } + ); + }); + } + } else if !prefix.is_empty() { + let first = prefix.first().unwrap(); + let prefix_suffix = prefix.iter().skip(1); + let suffix_full_path = quote!(#(#prefix_suffix::)*#name_ident).to_string(); + + if global { + panic!(); + } else { + into.push(quote! { + use #(#prefix_unspanned::)*#name_ident_unspanned; + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident = #first.extend( + #suffix_full_path, + { + let __quse_local = (); + { + use #(#prefix::)*#name_ident as __quse_local; + __quse_local + } + } + ); + }); + } + } else if global { + panic!(); + } else { + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident = #root::quoting::Import::clone(&#name_ident); + }); + } + } + _ => todo!(), + } +} + +#[proc_macro] +pub fn quse(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let input_tokens = proc_macro2::TokenStream::from(input); + let import: syn::ItemUse = syn::parse_quote!(use #input_tokens;); + let mut all_paths_emitted = vec![]; + gen_use_paths( + root, + import.leading_colon.is_some(), + vec![], + &import.tree, + false, + &mut all_paths_emitted, + ); + + quote! { + #(#all_paths_emitted;)* + } + .into() +} + +#[proc_macro] +pub fn quse_type(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let input_tokens = proc_macro2::TokenStream::from(input); + let import: syn::ItemUse = syn::parse_quote!(use #input_tokens;); + let mut all_paths_emitted = vec![]; + gen_use_paths( + root, + import.leading_colon.is_some(), + vec![], + &import.tree, + true, + &mut all_paths_emitted, + ); + + quote! { + #(#all_paths_emitted)* + } + .into() +} + +#[proc_macro] +pub fn qtype(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let defn: syn::Item = parse_macro_input!(input as syn::Item); + let name = match &defn { + syn::Item::Struct(s) => &s.ident, + syn::Item::Enum(e) => &e.ident, + _ => panic!("qtype must be used on a struct or enum"), + }; + + let definition_string = defn.to_token_stream().to_string(); + + quote! { + #defn + + #[allow(non_upper_case_globals, non_snake_case)] + fn #name() -> #root::quoting::Type { + #root::quoting::Type::new(#definition_string) + } + } + .into() +} + +#[proc_macro_attribute] +pub fn flow( + attr: proc_macro::TokenStream, + input: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let attr_params = + syn::parse_macro_input!(attr with Punctuated::parse_terminated); + + let input = syn::parse_macro_input!(input as syn::ItemFn); + let input_name = &input.sig.ident; + + let input_generics = &input.sig.generics; + + let mut runtime_data_params = Vec::new(); + let mut runtime_data_args = Vec::new(); + + let param_parsing = input.sig.inputs.iter().skip(1).enumerate().flat_map(|(i, input)| { + match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let runtime_tpe = match pat_type.ty.as_ref() { + Type::Path(path) => { + if path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" { + match &path.path.segments[0].arguments { + PathArguments::AngleBracketed(AngleBracketedGenericArguments { + args, + .. + }) => Some(args[0].clone()), + _ => None, + } + } else { + None + } + } + _ => None, + }; + + let pat = pat_type.pat.clone(); + let ty = pat_type.ty.clone(); + + if let Some(runtime_tpe) = runtime_tpe { + let mut visitor = FreeVariableVisitor { + free_variables: Vec::new(), + current_scope: ScopeStack::new(), + }; + + visitor.current_scope.insert_type(syn::Ident::new("RuntimeData", Span::call_site())); + + visitor.visit_generics(input_generics); + visitor.visit_generic_argument(&runtime_tpe); + + let mut out = vec![]; + + visitor.free_variables.iter().for_each(|i| { + out.push(quote! { + if let Some(prelude) = ::#root::quoting::ToFreeVariableTokens::to_tokens(&#i).0 { + runtime_data_prelude.push(prelude); + } + }); + }); + + runtime_data_params.push(quote! { + #pat: #runtime_tpe + }); + runtime_data_args.push(quote! { + ##pat + }); + + out.push(quote_spanned! {input.span()=> + let #pat: &#root::internal::syn::Expr = &input_parsed[#i]; + }); + + out + } else { + vec![quote_spanned! {input.span()=> + let #pat: #ty = #root::quoting::ParseFromLiteral::parse_from_literal(&input_parsed[#i]); + }] + } + } + } + }); + + let params_to_pass = input.sig.inputs.iter().skip(1).map(|input| match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let is_runtime = match pat_type.ty.as_ref() { + Type::Path(path) => { + path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" + } + _ => false, + }; + + if is_runtime { + let pat_ident = match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => pat_ident, + _ => panic!("RuntimeData must be an identifier"), + }; + let pat_str = pat_ident.ident.to_string(); + quote!(#root::quoting::RuntimeData::new(#pat_str)) + } else { + let pat = pat_type.pat.clone(); + quote!(#pat) + } + } + }); + + let expected_arg_count = input.sig.inputs.len() - 1; + + let pound = Punct::new('#', Spacing::Alone); + let passed_generics = if attr_params.is_empty() { + quote!() + } else { + quote!(::<#attr_params>) + }; + + let first_generic = input_generics + .params + .iter() + .find(|g| matches!(g, GenericParam::Lifetime(_))) + .cloned(); + + let hf_generics = first_generic.map(|g| quote!(<#g>)); + + proc_macro::TokenStream::from(quote_spanned! {input.span()=> + #[proc_macro] + pub fn #input_name(input: ::proc_macro::TokenStream) -> ::proc_macro::TokenStream { + #[allow(unused)] + let input_parsed = #root::internal::syn::parse::Parser::parse( + #root::internal::syn::punctuated::Punctuated::<#root::internal::syn::Expr, #root::internal::syn::Token![,]>::parse_terminated, + input + ).unwrap(); + + if input_parsed.len() != #expected_arg_count { + panic!("Expected {} arguments, got {}", #expected_arg_count, input_parsed.len()); + } + + #[allow(unused_mut)] + let mut runtime_data_prelude = ::std::vec::Vec::<#root::internal::TokenStream>::new(); + + #(#param_parsing)* + + #input + let dataflow_core = #root::hydroflow_build(|| { + #input_name #passed_generics(&#root::HfGraph::new(), #(#params_to_pass),*); + }); + + ::proc_macro::TokenStream::from(::#root::internal::quote!({ + #pound (#pound runtime_data_prelude)* + fn create_flow #input_generics( + #(#runtime_data_params),* + ) -> #root::hydroflow::scheduled::graph::Hydroflow #hf_generics { + #pound dataflow_core + } + create_flow(#(#runtime_data_args),*) + })) + } + }) +} diff --git a/website_playground/src/lib.rs b/website_playground/src/lib.rs index bbf61f43e9d6..d83c4ca311a8 100644 --- a/website_playground/src/lib.rs +++ b/website_playground/src/lib.rs @@ -197,13 +197,13 @@ pub fn compile_datalog(program: String) -> JsValue { serde_wasm_bindgen::to_value(&out).unwrap() } -struct HydroflowInstance { - hydroflow: Hydroflow, +struct HydroflowInstance<'a, In, Out> { + hydroflow: Hydroflow<'a>, input: tokio::sync::mpsc::UnboundedSender, output: tokio::sync::mpsc::UnboundedReceiver, } -type DatalogBooleanDemoInstance = HydroflowInstance<(i32,), (i32,)>; +type DatalogBooleanDemoInstance = HydroflowInstance<'static, (i32,), (i32,)>; thread_local! { static DATALOG_BOOLEAN_DEMO_INSTANCES: RefCell> =