From c916a8311a0c8d102d0bb9cf01719c8e19f4396c Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 10 Oct 2023 23:24:42 -0700 Subject: [PATCH] feat(hydroflow): prototype a functional surface syntax using staging --- .vscode/settings.json | 5 +- Cargo.lock | 173 +++++++- Cargo.toml | 8 + hydroflow/examples/rga/adjacency.rs | 2 +- hydroflow/examples/rga/datalog.rs | 2 +- hydroflow/examples/rga/datalog_agg.rs | 2 +- hydroflow/examples/rga/minimal.rs | 2 +- hydroflow/examples/shopping/flows/bp_flow.rs | 2 +- .../shopping/flows/client_state_flow.rs | 2 +- .../examples/shopping/flows/listener_flow.rs | 2 +- .../examples/shopping/flows/orig_flow.rs | 2 +- .../shopping/flows/push_group_flow.rs | 2 +- .../shopping/flows/rep_server_flow.rs | 2 +- .../shopping/flows/server_state_flow.rs | 2 +- .../examples/shopping/flows/ssiv_flow.rs | 2 +- hydroflow/src/scheduled/graph.rs | 22 +- hydroflow/src/scheduled/graph_ext.rs | 2 +- hydroflow/src/scheduled/net/mod.rs | 2 +- hydroflow/src/scheduled/net/network_vertex.rs | 2 +- hydroflow/src/scheduled/query.rs | 26 +- hydroflow/src/util/cli.rs | 2 +- .../surface_source_interval_badarg.stderr | 2 +- hydroflow_plus/Cargo.toml | 24 + hydroflow_plus/src/lib.rs | 320 ++++++++++++++ hydroflow_plus_kvs/Cargo.toml | 22 + hydroflow_plus_kvs/build.rs | 5 + hydroflow_plus_kvs_flow/Cargo.toml | 14 + hydroflow_plus_kvs_flow/src/lib.rs | 62 +++ hydroflow_plus_kvs_macro/Cargo.toml | 19 + hydroflow_plus_kvs_macro/build.rs | 8 + hydroflow_plus_kvs_macro/src/lib.rs | 6 + hydroflow_plus_kvs_server/Cargo.toml | 11 + hydroflow_plus_kvs_server/src/main.rs | 13 + stageleft/Cargo.toml | 18 + stageleft/src/lib.rs | 146 +++++++ stageleft/src/runtime_support.rs | 112 +++++ stageleft_macro/Cargo.toml | 20 + stageleft_macro/src/free_variable/mod.rs | 231 ++++++++++ stageleft_macro/src/free_variable/prelude.rs | 72 +++ stageleft_macro/src/lib.rs | 412 ++++++++++++++++++ stageleft_tool/Cargo.toml | 20 + stageleft_tool/src/lib.rs | 202 +++++++++ topolotree/.gitignore | 2 + website_playground/src/lib.rs | 6 +- 44 files changed, 1953 insertions(+), 60 deletions(-) create mode 100644 hydroflow_plus/Cargo.toml create mode 100644 hydroflow_plus/src/lib.rs create mode 100644 hydroflow_plus_kvs/Cargo.toml create mode 100644 hydroflow_plus_kvs/build.rs create mode 100644 hydroflow_plus_kvs_flow/Cargo.toml create mode 100644 hydroflow_plus_kvs_flow/src/lib.rs create mode 100644 hydroflow_plus_kvs_macro/Cargo.toml create mode 100644 hydroflow_plus_kvs_macro/build.rs create mode 100644 hydroflow_plus_kvs_macro/src/lib.rs create mode 100644 hydroflow_plus_kvs_server/Cargo.toml create mode 100644 hydroflow_plus_kvs_server/src/main.rs create mode 100644 stageleft/Cargo.toml create mode 100644 stageleft/src/lib.rs create mode 100644 stageleft/src/runtime_support.rs create mode 100644 stageleft_macro/Cargo.toml create mode 100644 stageleft_macro/src/free_variable/mod.rs create mode 100644 stageleft_macro/src/free_variable/prelude.rs create mode 100644 stageleft_macro/src/lib.rs create mode 100644 stageleft_tool/Cargo.toml create mode 100644 stageleft_tool/src/lib.rs create mode 100644 topolotree/.gitignore diff --git a/.vscode/settings.json b/.vscode/settings.json index 09bd82cadeee..d462ae195686 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,5 +8,8 @@ "INSTA_FORCE_PASS": "1" } } - ] + ], + "files.watcherExclude": { + "**/target": true + } } diff --git a/Cargo.lock b/Cargo.lock index d6df5a5689a0..73a46a883bd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,7 +149,7 @@ dependencies = [ "polling", "rustix", "slab", - "socket2", + "socket2 0.4.9", "waker-fn", ] @@ -907,9 +907,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -1242,6 +1242,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "home" version = "0.5.5" @@ -1464,6 +1470,63 @@ dependencies = [ "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus" +version = "0.5.0" +dependencies = [ + "hydroflow", + "hydroflow_lang", + "proc-macro-crate", + "proc-macro2", + "quote", + "stageleft", + "syn 2.0.14", +] + +[[package]] +name = "hydroflow_plus_kvs" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs_macro", + "regex", + "serde", + "stageleft", + "stageleft_tool", +] + +[[package]] +name = "hydroflow_plus_kvs_flow" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "regex", + "serde", + "stageleft", +] + +[[package]] +name = "hydroflow_plus_kvs_macro" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs_flow", + "regex", + "serde", + "stageleft", + "stageleft_tool", +] + +[[package]] +name = "hydroflow_plus_kvs_runtime" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs", + "regex", + "serde", +] + [[package]] name = "iana-time-zone" version = "0.1.56" @@ -1647,9 +1710,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.141" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libloading" @@ -2058,9 +2121,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -2553,7 +2616,7 @@ dependencies = [ "serde", "serde_json", "syn 1.0.109", - "syn-inline-mod", + "syn-inline-mod 0.5.0", "tempfile", "tree-sitter", "tree-sitter-cli", @@ -2723,6 +2786,30 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + +[[package]] +name = "sha256" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7895c8ae88588ccead14ff438b939b0c569cd619116f14b4d13fdff7b8333386" +dependencies = [ + "async-trait", + "bytes", + "hex", + "sha2", + "tokio", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -2804,6 +2891,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "socket2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ssh2" version = "0.9.3" @@ -2816,6 +2913,42 @@ dependencies = [ "parking_lot 0.11.2", ] +[[package]] +name = "stageleft" +version = "0.5.0" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "stageleft_macro", + "syn 2.0.14", +] + +[[package]] +name = "stageleft_macro" +version = "0.5.0" +dependencies = [ + "lazy_static", + "proc-macro-crate", + "proc-macro2", + "quote", + "sha256", + "syn 2.0.14", +] + +[[package]] +name = "stageleft_tool" +version = "0.5.0" +dependencies = [ + "lazy_static", + "proc-macro-crate", + "proc-macro2", + "quote", + "sha256", + "syn 2.0.14", + "syn-inline-mod 0.6.0", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -2866,6 +2999,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "syn-inline-mod" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fa6dca1fdb7b2ed46dd534a326725419d4fb10f23d8c85a8b2860e5eb25d0f9" +dependencies = [ + "proc-macro2", + "syn 2.0.14", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -3062,11 +3205,11 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", "mio", @@ -3074,16 +3217,16 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.4", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 0929d37cd473..04b5b07f87be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,18 @@ members = [ "hydroflow_datalog_core", "hydroflow_lang", "hydroflow_macro", + "hydroflow_plus", + "hydroflow_plus_kvs_flow", + "hydroflow_plus_kvs_macro", + "hydroflow_plus_kvs", + "hydroflow_plus_kvs_server", "lattices", "multiplatform_test", "pusherator", "relalg", + "stageleft", + "stageleft_macro", + "stageleft_tool", "variadics", "website_playground", ] diff --git a/hydroflow/examples/rga/adjacency.rs b/hydroflow/examples/rga/adjacency.rs index 546e0b524ce5..5f8d0e74a4ae 100644 --- a/hydroflow/examples/rga/adjacency.rs +++ b/hydroflow/examples/rga/adjacency.rs @@ -11,7 +11,7 @@ pub(crate) fn rga_adjacency( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv) -> tee(); diff --git a/hydroflow/examples/rga/datalog.rs b/hydroflow/examples/rga/datalog.rs index 362d9b7ccc79..787a6df93f1a 100644 --- a/hydroflow/examples/rga/datalog.rs +++ b/hydroflow/examples/rga/datalog.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp) | (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/datalog_agg.rs b/hydroflow/examples/rga/datalog_agg.rs index 4739012c08c8..6b7109e03deb 100644 --- a/hydroflow/examples/rga/datalog_agg.rs +++ b/hydroflow/examples/rga/datalog_agg.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog_agg( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp)| (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/minimal.rs b/hydroflow/examples/rga/minimal.rs index e78032a1bfff..23b3583fa5c1 100644 --- a/hydroflow/examples/rga/minimal.rs +++ b/hydroflow/examples/rga/minimal.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_minimal( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, _list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv); diff --git a/hydroflow/examples/shopping/flows/bp_flow.rs b/hydroflow/examples/shopping/flows/bp_flow.rs index 93e8e8a6f08f..960dfffca803 100644 --- a/hydroflow/examples/shopping/flows/bp_flow.rs +++ b/hydroflow/examples/shopping/flows/bp_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn bp_flow( shopping_bp: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/client_state_flow.rs b/hydroflow/examples/shopping/flows/client_state_flow.rs index 0d12ed44ed49..8d4cdb511b91 100644 --- a/hydroflow/examples/shopping/flows/client_state_flow.rs +++ b/hydroflow/examples/shopping/flows/client_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn client_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/listener_flow.rs b/hydroflow/examples/shopping/flows/listener_flow.rs index ff4114bbee64..0ab38af0b8cc 100644 --- a/hydroflow/examples/shopping/flows/listener_flow.rs +++ b/hydroflow/examples/shopping/flows/listener_flow.rs @@ -11,7 +11,7 @@ pub(crate) async fn listener_flow( tuple_input: UdpStream, bp_input: UdpStream, ssiv_input: UdpStream, -) -> Hydroflow { +) -> Hydroflow<'static> { // Simply print what we receive. hydroflow_syntax! { source_stream_serde(tuple_input) diff --git a/hydroflow/examples/shopping/flows/orig_flow.rs b/hydroflow/examples/shopping/flows/orig_flow.rs index d5803399619a..c92f080c8138 100644 --- a/hydroflow/examples/shopping/flows/orig_flow.rs +++ b/hydroflow/examples/shopping/flows/orig_flow.rs @@ -14,7 +14,7 @@ pub(crate) async fn orig_flow( shopping: impl Iterator + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // This is the straightforward single-transducer sequential case. diff --git a/hydroflow/examples/shopping/flows/push_group_flow.rs b/hydroflow/examples/shopping/flows/push_group_flow.rs index 4f22e3a0c96c..210f567a8ddc 100644 --- a/hydroflow/examples/shopping/flows/push_group_flow.rs +++ b/hydroflow/examples/shopping/flows/push_group_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn push_group_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/rep_server_flow.rs b/hydroflow/examples/shopping/flows/rep_server_flow.rs index 26b8e446e11d..8b62825b7a6b 100644 --- a/hydroflow/examples/shopping/flows/rep_server_flow.rs +++ b/hydroflow/examples/shopping/flows/rep_server_flow.rs @@ -20,7 +20,7 @@ pub(crate) async fn rep_server_flow( remote_addr: SocketAddr, gossip_addr: SocketAddr, server_addrs: impl Iterator + 'static, -) -> Hydroflow { +) -> Hydroflow<'static> { let (broadcast_out, broadcast_in, _) = hydroflow::util::bind_udp_bytes(gossip_addr).await; let client_class = client_class_iter(); let ssiv_merge = diff --git a/hydroflow/examples/shopping/flows/server_state_flow.rs b/hydroflow/examples/shopping/flows/server_state_flow.rs index d20416b654de..9b1bcde86405 100644 --- a/hydroflow/examples/shopping/flows/server_state_flow.rs +++ b/hydroflow/examples/shopping/flows/server_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn server_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/ssiv_flow.rs b/hydroflow/examples/shopping/flows/ssiv_flow.rs index 154d224c2cc2..34ddfefc5163 100644 --- a/hydroflow/examples/shopping/flows/ssiv_flow.rs +++ b/hydroflow/examples/shopping/flows/ssiv_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn ssiv_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 47afe68eb0db..e4745c1a1951 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -24,8 +24,8 @@ use super::{HandoffId, SubgraphId}; use crate::Never; /// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs. -pub struct Hydroflow { - pub(super) subgraphs: Vec, +pub struct Hydroflow<'a> { + pub(super) subgraphs: Vec>, pub(super) context: Context, handoffs: Vec, @@ -44,7 +44,7 @@ pub struct Hydroflow { /// See [`Self::diagnostics()`]. diagnostics: Option>>, } -impl Default for Hydroflow { +impl<'a> Default for Hydroflow<'a> { fn default() -> Self { let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl Default for Hydroflow { } } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Create a new empty Hydroflow graph. pub fn new() -> Self { Default::default() @@ -507,7 +507,7 @@ impl Hydroflow { Name: Into>, R: 'static + PortList, W: 'static + PortList, - F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), + F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), { let sg_id = SubgraphId(self.subgraphs.len()); @@ -675,7 +675,7 @@ impl Hydroflow { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Alias for [`Context::spawn_task`]. pub fn spawn_task(&mut self, future: Fut) where @@ -695,7 +695,7 @@ impl Hydroflow { } } -impl Drop for Hydroflow { +impl<'a> Drop for Hydroflow<'a> { fn drop(&mut self) { self.abort_tasks(); } @@ -740,14 +740,14 @@ impl HandoffData { /// /// Used internally by the [Hydroflow] struct to represent the dataflow graph /// structure and scheduled state. -pub(super) struct SubgraphData { +pub(super) struct SubgraphData<'a> { /// A friendly name for diagnostics. #[allow(dead_code)] // TODO(mingwei): remove attr once used. pub(super) name: Cow<'static, str>, /// This subgraph's stratum number. pub(super) stratum: usize, /// The actual execution code of the subgraph. - subgraph: Box, + subgraph: Box, #[allow(dead_code)] preds: Vec, succs: Vec, @@ -761,11 +761,11 @@ pub(super) struct SubgraphData { /// Keep track of the last tick that this subgraph was run in last_tick_run_in: Option, } -impl SubgraphData { +impl<'a> SubgraphData<'a> { pub fn new( name: Cow<'static, str>, stratum: usize, - subgraph: impl 'static + Subgraph, + subgraph: impl Subgraph + 'a, preds: Vec, succs: Vec, is_scheduled: bool, diff --git a/hydroflow/src/scheduled/graph_ext.rs b/hydroflow/src/scheduled/graph_ext.rs index db8b48726e0e..f1245516a153 100644 --- a/hydroflow/src/scheduled/graph_ext.rs +++ b/hydroflow/src/scheduled/graph_ext.rs @@ -125,7 +125,7 @@ pub trait GraphExt { W: 'static + Handoff + CanReceive; } -impl GraphExt for Hydroflow { +impl<'a> GraphExt for Hydroflow<'a> { subgraph_ext!(impl add_subgraph_sink, (recv_port: R), ()); subgraph_ext!( impl add_subgraph_2sink, diff --git a/hydroflow/src/scheduled/net/mod.rs b/hydroflow/src/scheduled/net/mod.rs index ae1fbde936fc..07309c81446f 100644 --- a/hydroflow/src/scheduled/net/mod.rs +++ b/hydroflow/src/scheduled/net/mod.rs @@ -96,7 +96,7 @@ impl Message { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort> { let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let (send_port, recv_port) = self.make_edge("tcp ingress handoff"); diff --git a/hydroflow/src/scheduled/net/network_vertex.rs b/hydroflow/src/scheduled/net/network_vertex.rs index 508437a2df51..13077e6290e8 100644 --- a/hydroflow/src/scheduled/net/network_vertex.rs +++ b/hydroflow/src/scheduled/net/network_vertex.rs @@ -18,7 +18,7 @@ pub type Address = String; // These methods can't be wrapped up in a trait because async methods are not // allowed in traits (yet). -impl Hydroflow { +impl<'a> Hydroflow<'a> { // TODO(justin): document these, but they're derivatives of inbound_tcp_vertex_internal. pub async fn inbound_tcp_vertex_port(&mut self, port: u16) -> RecvPort> where diff --git a/hydroflow/src/scheduled/query.rs b/hydroflow/src/scheduled/query.rs index 04da7d34704a..af9b0d6cc4c0 100644 --- a/hydroflow/src/scheduled/query.rs +++ b/hydroflow/src/scheduled/query.rs @@ -15,16 +15,16 @@ use crate::scheduled::handoff::VecHandoff; const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff"); #[derive(Default)] -pub struct Query { - df: Rc>, +pub struct Query<'a> { + df: Rc>>, } -impl Query { +impl<'a> Query<'a> { pub fn new() -> Self { Default::default() } - pub fn source(&mut self, f: F) -> Operator + pub fn source(&mut self, f: F) -> Operator<'a, T> where T: 'static, F: 'static + FnMut(&Context, &SendCtx>), @@ -40,7 +40,7 @@ impl Query { } } - pub fn concat(&mut self, ops: Vec>) -> Operator + pub fn concat(&mut self, ops: Vec>) -> Operator<'a, T> where T: 'static, { @@ -69,19 +69,19 @@ impl Query { } } -pub struct Operator +pub struct Operator<'a, T> where T: 'static, { - df: Rc>, + df: Rc>>, recv_port: RecvPort>, } -impl Operator +impl<'a, T> Operator<'a, T> where T: 'static, { - pub fn map(self, mut f: F) -> Operator + pub fn map(self, mut f: F) -> Operator<'a, U> where F: 'static + Fn(T) -> U, U: 'static, @@ -101,7 +101,7 @@ where } #[must_use] - pub fn filter(self, mut f: F) -> Operator + pub fn filter(self, mut f: F) -> Operator<'a, T> where F: 'static + Fn(&T) -> bool, { @@ -125,7 +125,7 @@ where } #[must_use] - pub fn concat(self, other: Operator) -> Operator { + pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> { // TODO(justin): this is very slow. let mut df = self.df.borrow_mut(); @@ -163,8 +163,8 @@ where } } -impl Operator { - pub fn tee(self, n: usize) -> Vec> +impl<'a, T: Clone> Operator<'a, T> { + pub fn tee(self, n: usize) -> Vec> where T: 'static, { diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 90928acd8a00..c5f359fa0af6 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -6,7 +6,7 @@ pub use hydroflow_cli_integration::*; use crate::scheduled::graph::Hydroflow; -pub async fn launch_flow(mut flow: Hydroflow) { +pub async fn launch_flow(mut flow: Hydroflow<'_>) { let stop = tokio::sync::oneshot::channel(); tokio::task::spawn_blocking(|| { let mut line = String::new(); diff --git a/hydroflow/tests/compile-fail/surface_source_interval_badarg.stderr b/hydroflow/tests/compile-fail/surface_source_interval_badarg.stderr index 1a767a402930..08a9d452bd3d 100644 --- a/hydroflow/tests/compile-fail/surface_source_interval_badarg.stderr +++ b/hydroflow/tests/compile-fail/surface_source_interval_badarg.stderr @@ -10,7 +10,7 @@ error[E0308]: mismatched types | |_____- arguments to this function are incorrect | note: function defined here - --> $CARGO/tokio-1.27.0/src/time/interval.rs + --> $CARGO/tokio-1.32.0/src/time/interval.rs | | pub fn interval(period: Duration) -> Interval { | ^^^^^^^^ diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml new file mode 100644 index 000000000000..98bcd9b5c3d9 --- /dev/null +++ b/hydroflow_plus/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "hydroflow_plus" +publish = true +version = "0.5.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus/" +description = "Functional programming API for hydroflow" + +[lib] +path = "src/lib.rs" + +[features] +default = [] +diagnostics = [ "hydroflow_lang/diagnostics" ] + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +hydroflow = { path = "../hydroflow", version = "^0.5.0" } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" } +stageleft = { path = "../stageleft", version = "^0.5.0" } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs new file mode 100644 index 000000000000..5483bbdb38b1 --- /dev/null +++ b/hydroflow_plus/src/lib.rs @@ -0,0 +1,320 @@ +use std::cell::RefCell; +use std::marker::PhantomData; + +pub use hydroflow; +use hydroflow::futures::stream::Stream; +use hydroflow::scheduled::context::Context; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow_lang::graph::{partition_graph, propegate_flow_props, FlatGraphBuilder}; +use proc_macro2::{Span, TokenStream}; +use quote::quote; +use stageleft::runtime_support::FreeVariable; +use stageleft::{IntoQuotedMut, IntoQuotedOnce, Quoted, QuotedContext}; +use syn::parse_quote; + +#[derive(Clone)] +pub struct RuntimeContext<'a> { + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl Copy for RuntimeContext<'_> {} + +impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(&context))) + } +} + +pub struct HfBuilt<'a> { + tokens: TokenStream, + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> Quoted> for HfBuilt<'a> { + fn splice(self) -> TokenStream { + self.tokens + } +} + +pub struct HfBuilder<'a> { + next_id: RefCell, + pub(crate) builder: RefCell>, + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> QuotedContext for HfBuilder<'a> { + fn create() -> Self { + HfBuilder::new() + } +} + +impl<'a> HfBuilder<'a> { + #[allow(clippy::new_without_default)] + pub fn new() -> HfBuilder<'a> { + HfBuilder { + next_id: RefCell::new(0), + builder: RefCell::new(Some(FlatGraphBuilder::new())), + _phantom: PhantomData, + } + } + + pub fn build(&self) -> HfBuilt<'a> { + let builder = self.builder.borrow_mut().take().unwrap(); + + let (flat_graph, _, _) = builder.build(); + let mut partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident::hydroflow } + } + }; + + let mut diagnostics = Vec::new(); + // Propgeate flow properties throughout the graph. + // TODO(mingwei): Should this be done at a flat graph stage instead? + let _ = + propegate_flow_props::propegate_flow_props(&mut partitioned_graph, &mut diagnostics); + + let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics); + + HfBuilt { + tokens, + _phantom: PhantomData, + } + } + + pub fn runtime_context(&self) -> RuntimeContext<'a> { + RuntimeContext { + _phantom: PhantomData, + } + } + + pub fn source_stream + Unpin>( + &'a self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.splice(); + + self.builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_stream(#e) -> tee(); + }); + + HfStream { + ident, + graph: self, + _phantom: PhantomData, + } + } + + pub fn source_iter>( + &'a self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.splice(); + + self.builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + + HfStream { + ident, + graph: self, + _phantom: PhantomData, + } + } +} + +pub struct HfStream<'a, T> { + ident: syn::Ident, + graph: &'a HfBuilder<'a>, + _phantom: PhantomData<&'a mut &'a T>, +} + +impl<'a, T> HfStream<'a, T> { + pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("map_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> map(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn filter bool + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("filter_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> filter(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn filter_map Option + 'a>( + &self, + f: impl IntoQuotedMut<'a, F>, + ) -> HfStream<'a, U> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("filter_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> filter_map(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn for_each(&self, f: impl IntoQuotedMut<'a, F>) { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> for_each(#f); + }); + } +} + +impl<'a, K, V1> HfStream<'a, (K, V1)> { + pub fn join(&'a self, n: &HfStream<(K, V2)>) -> HfStream<(K, (V1, V2))> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let other_ident = &n.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = join() -> tee(); + }); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #self_ident -> [0]#ident; + }); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #other_ident -> [1]#ident; + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } +} diff --git a/hydroflow_plus_kvs/Cargo.toml b/hydroflow_plus_kvs/Cargo.toml new file mode 100644 index 000000000000..7b9deb4836b2 --- /dev/null +++ b/hydroflow_plus_kvs/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "hydroflow_plus_kvs" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +path = "../hydroflow_plus_kvs_flow/src/lib.rs" + +[features] +default = ["final"] +final = [] + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +stageleft = { path = "../stageleft", version = "^0.5.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs_macro = { path = "../hydroflow_plus_kvs_macro" } + +[build-dependencies] +stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" } diff --git a/hydroflow_plus_kvs/build.rs b/hydroflow_plus_kvs/build.rs new file mode 100644 index 000000000000..70fd49e747ec --- /dev/null +++ b/hydroflow_plus_kvs/build.rs @@ -0,0 +1,5 @@ +use std::path::Path; + +fn main() { + stageleft_tool::gen_final(Path::new("../hydroflow_plus_kvs_flow")); +} diff --git a/hydroflow_plus_kvs_flow/Cargo.toml b/hydroflow_plus_kvs_flow/Cargo.toml new file mode 100644 index 000000000000..89edd119343b --- /dev/null +++ b/hydroflow_plus_kvs_flow/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "hydroflow_plus_kvs_flow" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +stageleft = { path = "../stageleft", version = "^0.5.0" } +regex = "1" +serde = "1" diff --git a/hydroflow_plus_kvs_flow/src/lib.rs b/hydroflow_plus_kvs_flow/src/lib.rs new file mode 100644 index 000000000000..b5fb5c35a0a8 --- /dev/null +++ b/hydroflow_plus_kvs_flow/src/lib.rs @@ -0,0 +1,62 @@ +#![cfg_attr(feature = "final", allow(unused))] + +#[cfg(feature = "final")] +#[doc(hidden)] +pub(crate) use hydroflow_plus_kvs_macro as __macro; + +#[cfg(feature = "final")] +#[doc(hidden)] +pub mod __flow { + include!(concat!(env!("OUT_DIR"), "/lib_pub.rs")); +} + +use hydroflow_plus::hydroflow::bytes::Bytes; +use hydroflow_plus::hydroflow::scheduled::graph::Hydroflow; +use hydroflow_plus::hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow_plus::hydroflow::util; +use hydroflow_plus::HfBuilder; +use serde::{Deserialize, Serialize}; +use stageleft::{q, Quoted, RuntimeData}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub enum KVSMessage { + Put { key: String, value: String }, + Get { key: String }, + Response { key: String, value: String }, +} + +#[stageleft::entry] +pub fn my_kvs<'a>( + graph: &'a HfBuilder<'a>, + enable_debug: bool, + input_stream: RuntimeData>, +) -> impl Quoted> { + let input_bytes = graph.source_stream(q!(input_stream)); + + let inbound_channel = input_bytes.map(q!(|bytes| util::deserialize_from_bytes::( + bytes + ) + .unwrap())); + + let gets = inbound_channel.filter_map(q!(|msg| match msg { + KVSMessage::Get { key } => Some(key), + _ => None, + })); + + let puts = inbound_channel.filter_map(q!(|msg| match msg { + KVSMessage::Put { key, value } => Some((key, value)), + _ => None, + })); + + if enable_debug { + puts.for_each(q!(|msg| { + println!("Got a Put {:?}", msg); + })); + + gets.for_each(q!(|msg| { + println!("Got a Get {:?}", msg); + })); + } + + graph.build() +} diff --git a/hydroflow_plus_kvs_macro/Cargo.toml b/hydroflow_plus_kvs_macro/Cargo.toml new file mode 100644 index 000000000000..ea6f40b22b61 --- /dev/null +++ b/hydroflow_plus_kvs_macro/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "hydroflow_plus_kvs_macro" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +stageleft = { path = "../stageleft", version = "^0.5.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs_flow = { path = "../hydroflow_plus_kvs_flow" } + +[build-dependencies] +stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" } diff --git a/hydroflow_plus_kvs_macro/build.rs b/hydroflow_plus_kvs_macro/build.rs new file mode 100644 index 000000000000..458f983bde66 --- /dev/null +++ b/hydroflow_plus_kvs_macro/build.rs @@ -0,0 +1,8 @@ +use std::path::Path; + +fn main() { + stageleft_tool::gen_macro( + Path::new("../hydroflow_plus_kvs_flow"), + "hydroflow_plus_kvs", + ); +} diff --git a/hydroflow_plus_kvs_macro/src/lib.rs b/hydroflow_plus_kvs_macro/src/lib.rs new file mode 100644 index 000000000000..24dd860c0b2b --- /dev/null +++ b/hydroflow_plus_kvs_macro/src/lib.rs @@ -0,0 +1,6 @@ +#![allow(unused)] + +#[doc(hidden)] +pub(crate) use hydroflow_plus_kvs_flow as __flow; + +include!(concat!(env!("OUT_DIR"), "/lib.rs")); diff --git a/hydroflow_plus_kvs_server/Cargo.toml b/hydroflow_plus_kvs_server/Cargo.toml new file mode 100644 index 000000000000..338acc59abfd --- /dev/null +++ b/hydroflow_plus_kvs_server/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "hydroflow_plus_kvs_runtime" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs = { path = "../hydroflow_plus_kvs" } diff --git a/hydroflow_plus_kvs_server/src/main.rs b/hydroflow_plus_kvs_server/src/main.rs new file mode 100644 index 000000000000..52cf951da944 --- /dev/null +++ b/hydroflow_plus_kvs_server/src/main.rs @@ -0,0 +1,13 @@ +use hydroflow_plus::hydroflow::bytes::Bytes; +use hydroflow_plus_kvs::my_kvs; + +fn main() { + let test = hydroflow_plus_kvs::KVSMessage::Get { + key: "lol".to_string(), + }; + let _blah: hydroflow_plus_kvs::__flow::KVSMessage = test; + + let (_send, recv) = hydroflow_plus::hydroflow::util::unbounded_channel::(); + let mut flow = my_kvs!(true, recv); + flow.run_tick(); +} diff --git a/stageleft/Cargo.toml b/stageleft/Cargo.toml new file mode 100644 index 000000000000..66b4d333f030 --- /dev/null +++ b/stageleft/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "stageleft" +publish = true +version = "0.5.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stageleft/" +description = "Type-safe staged programming for Rust" + +[lib] +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +stageleft_macro = { path = "../stageleft_macro", version = "^0.5.0" } diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs new file mode 100644 index 000000000000..25c1605c520b --- /dev/null +++ b/stageleft/src/lib.rs @@ -0,0 +1,146 @@ +use std::marker::PhantomData; + +use proc_macro2::{Span, TokenStream}; +use quote::quote; + +pub mod internal { + pub use proc_macro2::TokenStream; + pub use quote::quote; + pub use {proc_macro2, syn}; +} + +pub use stageleft_macro::{entry, q, quse_fn}; + +pub mod runtime_support; +use runtime_support::{FreeVariable, CURRENT_FINAL_CRATE}; + +pub trait QuotedContext { + fn create() -> Self; +} + +pub trait Quoted: Sized { + fn splice(self) -> TokenStream; +} + +type FreeVariables = Vec<(String, (Option, Option))>; + +pub trait IntoQuotedOnce<'a, T>: + FnOnce(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ +} + +impl<'a, T, F: FnOnce(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a> + IntoQuotedOnce<'a, T> for F +{ +} + +impl<'a, T, F: FnOnce(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a> Quoted + for F +{ + fn splice(self) -> TokenStream { + let mut module_path = String::new(); + let mut expr_str = String::new(); + let mut free_variables = Vec::new(); + // this is an uninit value so we can't drop it + std::mem::forget(self( + &mut module_path, + &mut expr_str, + &mut free_variables, + false, + )); + + let instantiated_free_variables = free_variables.iter().flat_map(|(ident, value)| { + let ident = syn::Ident::new(ident, Span::call_site()); + value.0.iter().map(|prelude| quote!(#prelude)).chain( + value + .1 + .iter() + .map(move |value| quote!(let #ident = #value;)), + ) + }); + + let final_crate_name = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate = proc_macro_crate::crate_name(final_crate_name) + .unwrap_or_else(|_| panic!("{final_crate_name} should be present in `Cargo.toml`")); + let final_crate_root = match final_crate { + proc_macro_crate::FoundCrate::Itself => syn::parse_str(final_crate_name).unwrap(), + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let module_path: syn::Path = syn::parse_str(&module_path).unwrap(); + let mut module_path = module_path + .segments + .iter() + .skip(1) + .cloned() + .collect::>(); + module_path.insert( + 0, + syn::PathSegment { + ident: syn::Ident::new("__flow", Span::call_site()), + arguments: syn::PathArguments::None, + }, + ); + let module_path = syn::Path { + leading_colon: None, + segments: syn::punctuated::Punctuated::from_iter(module_path.into_iter()), + }; + + let expr: syn::Expr = syn::parse_str(&expr_str).unwrap(); + quote!({ + use ::#final_crate_root::#module_path::*; + #(#instantiated_free_variables)* + #expr + }) + } +} + +pub trait IntoQuotedMut<'a, T>: + FnMut(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ +} + +impl<'a, T, F: FnMut(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a> + IntoQuotedMut<'a, T> for F +{ +} + +pub struct RuntimeData { + ident: &'static str, + _phantom: PhantomData, +} + +impl Copy for RuntimeData {} + +impl Clone for RuntimeData { + fn clone(&self) -> Self { + // TODO(shadaj): mark this as cloned so we clone it in the splice + RuntimeData { + ident: self.ident.clone(), + _phantom: PhantomData, + } + } +} + +impl RuntimeData { + pub fn new(ident: &'static str) -> RuntimeData { + RuntimeData { + ident, + _phantom: PhantomData, + } + } +} + +impl FreeVariable for RuntimeData { + fn to_tokens(&self) -> (Option, Option) { + let ident = syn::Ident::new(self.ident, Span::call_site()); + (None, Some(quote!(#ident))) + } +} diff --git a/stageleft/src/runtime_support.rs b/stageleft/src/runtime_support.rs new file mode 100644 index 000000000000..b27f1d2826ad --- /dev/null +++ b/stageleft/src/runtime_support.rs @@ -0,0 +1,112 @@ +use std::cell::RefCell; +use std::marker::PhantomData; +use std::mem::MaybeUninit; + +use proc_macro2::{Span, TokenStream}; +use quote::quote; + +thread_local!(pub static CURRENT_FINAL_CRATE: RefCell> = RefCell::new(None)); + +pub trait ParseFromLiteral { + fn parse_from_literal(literal: &syn::Expr) -> Self; +} + +impl ParseFromLiteral for bool { + fn parse_from_literal(literal: &syn::Expr) -> Self { + match literal { + syn::Expr::Lit(syn::ExprLit { + lit: syn::Lit::Bool(lit_bool), + .. + }) => lit_bool.value(), + _ => panic!("Expected literal"), + } + } +} + +impl ParseFromLiteral for u32 { + fn parse_from_literal(literal: &syn::Expr) -> Self { + match literal { + syn::Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit_int), + .. + }) => lit_int.base10_parse().unwrap(), + _ => panic!("Expected literal"), + } + } +} + +pub trait FreeVariable +where + Self: Sized, +{ + fn to_tokens(&self) -> (Option, Option); + + fn uninitialized(self) -> O { + #[allow(clippy::uninit_assumed_init)] + unsafe { + MaybeUninit::uninit().assume_init() + } + } +} + +impl FreeVariable for u32 { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + +pub struct Import { + module_path: &'static str, + path: &'static str, + as_name: &'static str, + _phantom: PhantomData, +} + +impl Copy for Import {} +impl Clone for Import { + fn clone(&self) -> Self { + Import { + module_path: self.module_path, + path: self.path, + as_name: self.as_name, + _phantom: PhantomData, + } + } +} + +pub fn create_import( + module_path: &'static str, + path: &'static str, + as_name: &'static str, + _unused_type_check: T, +) -> Import { + Import { + module_path, + path, + as_name, + _phantom: PhantomData, + } +} + +impl FreeVariable for Import { + fn to_tokens(&self) -> (Option, Option) { + let final_crate_name = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate = proc_macro_crate::crate_name(final_crate_name) + .unwrap_or_else(|_| panic!("{final_crate_name} should be present in `Cargo.toml`")); + let final_crate_root = match final_crate { + proc_macro_crate::FoundCrate::Itself => syn::parse_str(final_crate_name).unwrap(), + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let module_path = syn::parse_str::(self.module_path).unwrap(); + let parsed = syn::parse_str::(self.path).unwrap(); + let as_ident = syn::Ident::new(self.as_name, proc_macro2::Span::call_site()); + ( + Some(quote!(use #final_crate_root::#module_path::#parsed as #as_ident;)), + None, + ) + } +} diff --git a/stageleft_macro/Cargo.toml b/stageleft_macro/Cargo.toml new file mode 100644 index 000000000000..adf604d29e0b --- /dev/null +++ b/stageleft_macro/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "stageleft_macro" +publish = true +version = "0.5.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stageleft_macro/" +description = "Helper macros for the stageleft crate" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +lazy_static = "1.4.0" +sha256 = "1.4.0" diff --git a/stageleft_macro/src/free_variable/mod.rs b/stageleft_macro/src/free_variable/mod.rs new file mode 100644 index 000000000000..48b42526a652 --- /dev/null +++ b/stageleft_macro/src/free_variable/mod.rs @@ -0,0 +1,231 @@ +use std::collections::HashSet; + +mod prelude; +use prelude::is_prelude; +use syn::punctuated::Punctuated; +use syn::visit::Visit; +use syn::MacroDelimiter; + +#[derive(Debug)] +pub struct ScopeStack { + scopes: Vec<(HashSet, HashSet)>, +} + +impl Default for ScopeStack { + fn default() -> Self { + ScopeStack { + scopes: vec![(HashSet::new(), HashSet::new())], + } + } +} + +impl ScopeStack { + pub fn push(&mut self) { + self.scopes.push((HashSet::new(), HashSet::new())); + } + + pub fn pop(&mut self) { + self.scopes.pop(); + } + + pub fn insert_term(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .0 + .insert(ident.to_string()); + } + + pub fn insert_type(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .1 + .insert(ident.to_string()); + } + + pub fn contains_term(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.0.contains(&ident)) + } + + pub fn contains_type(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.1.contains(&ident)) + } +} + +#[derive(Default)] +pub struct FreeVariableVisitor { + pub free_variables: Vec, + pub current_scope: ScopeStack, +} + +impl<'ast> Visit<'ast> for FreeVariableVisitor { + fn visit_expr_closure(&mut self, i: &'ast syn::ExprClosure) { + self.current_scope.push(); + i.inputs.iter().for_each(|input| match input { + syn::Pat::Ident(pat_ident) => self.current_scope.insert_term(pat_ident.ident.clone()), + syn::Pat::Type(pat_type) => match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()) + } + _ => panic!("Closure parameters must be identifiers"), + }, + _ => panic!("Closure parameters must be identifiers"), + }); + + syn::visit::visit_expr_closure(self, i); + + self.current_scope.pop(); + } + + fn visit_item_fn(&mut self, i: &'ast syn::ItemFn) { + self.current_scope.push(); + syn::visit::visit_item_fn(self, i); + self.current_scope.pop(); + } + + fn visit_generic_param(&mut self, i: &'ast syn::GenericParam) { + match i { + syn::GenericParam::Type(type_param) => { + self.current_scope.insert_type(type_param.ident.clone()); + } + syn::GenericParam::Lifetime(lifetime_param) => { + self.current_scope + .insert_type(lifetime_param.lifetime.ident.clone()); + } + syn::GenericParam::Const(const_param) => { + self.current_scope.insert_type(const_param.ident.clone()); + } + } + } + + fn visit_block(&mut self, i: &'ast syn::Block) { + self.current_scope.push(); + syn::visit::visit_block(self, i); + self.current_scope.pop(); + } + + fn visit_local(&mut self, i: &'ast syn::Local) { + i.init.iter().for_each(|init| { + syn::visit::visit_local_init(self, init); + }); + + match &i.pat { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()); + } + _ => panic!("Local variables must be identifiers"), + } + } + + fn visit_ident(&mut self, i: &'ast proc_macro2::Ident) { + if !self.current_scope.contains_term(i) { + self.free_variables.push(i.clone()); + } + } + + fn visit_lifetime(&mut self, i: &'ast syn::Lifetime) { + if !self.current_scope.contains_type(&i.ident) { + self.free_variables.push(i.ident.clone()); + } + } + + fn visit_path(&mut self, i: &'ast syn::Path) { + if i.leading_colon.is_none() && !is_prelude(&i.segments.first().unwrap().ident) { + let node = i.segments.first().unwrap(); + if i.segments.len() == 1 && !self.current_scope.contains_term(&node.ident) { + self.free_variables.push(node.ident.clone()); + } + } + + for node in i.segments.iter() { + self.visit_path_arguments(&node.arguments); + } + } + + fn visit_arm(&mut self, i: &'ast syn::Arm) { + self.current_scope.push(); + syn::visit::visit_arm(self, i); + self.current_scope.pop(); + } + + fn visit_field_pat(&mut self, i: &'ast syn::FieldPat) { + for it in &i.attrs { + self.visit_attribute(it); + } + self.visit_pat(&i.pat); + } + + fn visit_pat_ident(&mut self, i: &'ast syn::PatIdent) { + self.current_scope.insert_term(i.ident.clone()); + } + + fn visit_expr_method_call(&mut self, i: &'ast syn::ExprMethodCall) { + syn::visit::visit_expr(self, &i.receiver); + } + + fn visit_type(&mut self, _: &'ast syn::Type) {} + + fn visit_expr_struct(&mut self, node: &'ast syn::ExprStruct) { + for it in &node.attrs { + self.visit_attribute(it); + } + if let Some(it) = &node.qself { + self.visit_qself(it); + } + self.visit_path(&node.path); + for el in Punctuated::pairs(&node.fields) { + let it = el.value(); + self.visit_expr(&it.expr); + } + if let Some(it) = &node.rest { + self.visit_expr(it); + } + } + + fn visit_expr_field(&mut self, i: &'ast syn::ExprField) { + self.visit_expr(&i.base); + } + + fn visit_macro(&mut self, i: &'ast syn::Macro) { + // TODO(shadaj): emit a warning if our guess at parsing fails + match i.delimiter { + MacroDelimiter::Paren(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + MacroDelimiter::Brace(_binding_0) => i + .parse_body_with(syn::Block::parse_within) + .ok() + .iter() + .flatten() + .for_each(|stmt| { + self.visit_stmt(stmt); + }), + MacroDelimiter::Bracket(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + } + } +} diff --git a/stageleft_macro/src/free_variable/prelude.rs b/stageleft_macro/src/free_variable/prelude.rs new file mode 100644 index 000000000000..0a16bb42f072 --- /dev/null +++ b/stageleft_macro/src/free_variable/prelude.rs @@ -0,0 +1,72 @@ +use std::collections::HashSet; + +use lazy_static::lazy_static; + +lazy_static! { + static ref PRELUDE: HashSet<&'static str> = { + vec![ + // https://doc.rust-lang.org/core/ + "bool", + "char", + "f32", + "f64", + "i8", + "i16", + "i32", + "i64", + "i128", + "isize", + "str", + "u8", + "u16", + "u32", + "u64", + "u128", + "usize", + // https://doc.rust-lang.org/std/prelude/index.html + "Copy", + "Send", + "Sized", + "Sync", + "Unpin", + "Drop", + "Fn", + "FnMut", + "FnOnce", + "drop", + "Box", + "ToOwned", + "Clone", + "PartialEq", + "PartialOrd", + "Eq", + "Ord", + "AsRef", + "AsMut", + "Into", + "From", + "Default", + "Iterator", + "Extend", + "IntoIterator", + "DoubleEndedIterator", + "ExactSizeIterator", + "Option", + "Some", + "None", + "Result", + "Ok", + "Err", + "String", + "ToString", + "Vec", + ] + .into_iter() + .collect::>() + }; +} + +pub fn is_prelude(ident: &syn::Ident) -> bool { + let ident_str = ident.to_string(); + PRELUDE.contains(&ident_str.as_str()) +} diff --git a/stageleft_macro/src/lib.rs b/stageleft_macro/src/lib.rs new file mode 100644 index 000000000000..d1507a468f80 --- /dev/null +++ b/stageleft_macro/src/lib.rs @@ -0,0 +1,412 @@ +use proc_macro2::{Punct, Spacing, Span, TokenStream}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::punctuated::Punctuated; +use syn::spanned::Spanned; +use syn::visit::Visit; +use syn::{AngleBracketedGenericArguments, PathArguments, Token, Type}; + +mod free_variable; +use free_variable::*; + +#[proc_macro] +pub fn q(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let stageleft_crate = proc_macro_crate::crate_name("stageleft") + .expect("stageleft should be present in `Cargo.toml`"); + let root = match stageleft_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stageleft }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let expr = syn::parse_macro_input!(input as syn::Expr); + let mut visitor = FreeVariableVisitor::default(); + visitor.visit_expr(&expr); + + let free_variables = visitor.free_variables.iter().map(|i| { + let ident = i.clone(); + let ident_str = ident.to_string(); + quote!((#ident_str.to_string(), ::#root::runtime_support::FreeVariable::to_tokens(&#ident))) + }); + + let cloned_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(non_upper_case_globals, non_snake_case)] + let #i_without_span = #i_without_span; + ) + }); + + let unitialized_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(unused, non_upper_case_globals, non_snake_case)] + let #i = ::#root::runtime_support::FreeVariable::uninitialized(#i_without_span) + ) + }); + + let free_variables_vec = quote!(vec![#(#free_variables),*]); + + let expr_string = expr.clone().into_token_stream().to_string(); + proc_macro::TokenStream::from(quote!({ + #(#cloned_free_variables;)* + move |set_mod: &mut String, set_str: &mut String, set_vec: &mut Vec<(String, (Option<#root::internal::TokenStream>, Option<#root::internal::TokenStream>))>, run: bool| { + *set_mod = module_path!().to_string(); + *set_str = #expr_string.to_string(); + *set_vec = #free_variables_vec; + + if !run { + unsafe { + return ::std::mem::MaybeUninit::uninit().assume_init(); + } + } + + #[allow(unreachable_code)] + { + #(#unitialized_free_variables;)* + #expr + } + } + })) +} + +fn gen_use_paths( + root: TokenStream, + is_rooted: bool, + mut prefix: Vec, + tree: &syn::UseTree, + into: &mut Vec, +) { + match &tree { + syn::UseTree::Path(path) => { + prefix.push(path.ident.clone()); + gen_use_paths(root, is_rooted, prefix, &path.tree, into); + } + syn::UseTree::Group(group) => { + for tree in &group.items { + gen_use_paths(root.clone(), is_rooted, prefix.clone(), tree, into); + } + } + syn::UseTree::Name(name) => { + let name_ident = name.ident.clone(); + let mut name_ident_unspanned = name_ident.clone(); + name_ident_unspanned.set_span(Span::call_site()); + let prefix_unspanned = prefix + .iter() + .map(|i| { + let mut i = i.clone(); + i.set_span(Span::call_site()); + i + }) + .collect::>(); + + if is_rooted { + let full_path = quote!(#(#prefix::)*#name_ident).to_string(); + + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident_unspanned = #root::runtime_support::create_import( + #full_path, + { + let __quse_local = (); + { + use ::#(#prefix_unspanned::)*#name_ident_unspanned as __quse_local; + __quse_local + } + } + ); + }); + } else if !prefix.is_empty() { + let first = prefix.first().unwrap(); + let prefix_suffix = prefix.iter().skip(1); + let suffix_full_path = quote!(#(#prefix_suffix::)*#name_ident).to_string(); + + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident_unspanned = #first.extend( + #suffix_full_path, + { + let __quse_local = (); + { + use #(#prefix_unspanned::)*#name_ident_unspanned as __quse_local; + __quse_local + } + } + ); + }); + } else { + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident = #root::runtime_support::Import::clone(&#name_ident); + }); + } + } + _ => todo!(), + } +} + +#[proc_macro] +pub fn quse_fn(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let stageleft_crate = proc_macro_crate::crate_name("stageleft") + .expect("stageleft should be present in `Cargo.toml`"); + let root = match stageleft_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stageleft }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let input_tokens = proc_macro2::TokenStream::from(input); + let import: syn::ItemUse = syn::parse_quote!(use #input_tokens;); + let mut all_paths_emitted = vec![]; + gen_use_paths( + root, + import.leading_colon.is_some(), + vec![], + &import.tree, + &mut all_paths_emitted, + ); + + quote! { + use #input_tokens; + #(#all_paths_emitted;)* + } + .into() +} + +#[proc_macro_attribute] +pub fn entry( + attr: proc_macro::TokenStream, + input: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let stageleft_crate = proc_macro_crate::crate_name("stageleft") + .expect("stageleft should be present in `Cargo.toml`"); + let root = match stageleft_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stageleft }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let attr_params = + syn::parse_macro_input!(attr with Punctuated::parse_terminated); + + let input = syn::parse_macro_input!(input as syn::ItemFn); + let input_name = &input.sig.ident; + + let input_generics = &input.sig.generics; + + let mut runtime_data_params = Vec::new(); + let mut runtime_data_args = Vec::new(); + + let param_parsing = input.sig.inputs.iter().skip(1).enumerate().flat_map(|(i, input)| { + match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let runtime_tpe = match pat_type.ty.as_ref() { + Type::Path(path) => { + if path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" { + match &path.path.segments[0].arguments { + PathArguments::AngleBracketed(AngleBracketedGenericArguments { + args, + .. + }) => Some(args[0].clone()), + _ => None, + } + } else { + None + } + } + _ => None, + }; + + let pat = pat_type.pat.clone(); + let ty = pat_type.ty.clone(); + + if let Some(runtime_tpe) = runtime_tpe { + let mut visitor = FreeVariableVisitor::default(); + + visitor.current_scope.insert_type(syn::Ident::new("RuntimeData", Span::call_site())); + + visitor.visit_generics(input_generics); + visitor.visit_generic_argument(&runtime_tpe); + + let mut out = vec![]; + + visitor.free_variables.iter().for_each(|i| { + let mut i_unspanned = i.clone(); + i_unspanned.set_span(Span::call_site()); + out.push(quote! { + if let Some(prelude) = ::#root::runtime_support::FreeVariable::to_tokens(&#i_unspanned).0 { + runtime_data_prelude.push(prelude); + } + }); + }); + + runtime_data_params.push(quote! { + #pat: #runtime_tpe + }); + runtime_data_args.push(quote! { + ##pat + }); + + out.push(quote_spanned! {input.span()=> + let #pat: &#root::internal::syn::Expr = &input_parsed[#i]; + }); + + out + } else { + vec![quote_spanned! {input.span()=> + let #pat: #ty = #root::runtime_support::ParseFromLiteral::parse_from_literal(&input_parsed[#i]); + }] + } + } + } + }); + + let params_to_pass = input.sig.inputs.iter().skip(1).map(|input| match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let is_runtime = match pat_type.ty.as_ref() { + Type::Path(path) => { + path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" + } + _ => false, + }; + + if is_runtime { + let pat_ident = match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => pat_ident, + _ => panic!("RuntimeData must be an identifier"), + }; + let pat_str = pat_ident.ident.to_string(); + quote!(#root::RuntimeData::new(#pat_str)) + } else { + let pat = pat_type.pat.clone(); + quote!(#pat) + } + } + }); + + let expected_arg_count = input.sig.inputs.len() - 1; + + let pound = Punct::new('#', Spacing::Alone); + let passed_generics = if attr_params.is_empty() { + quote!() + } else { + quote!(::<#attr_params>) + }; + + // the return type is always of form `impl Quoted`, this grabs `T` and any free variable imports for it + let (return_type_free, return_type_inner) = match &input.sig.output { + syn::ReturnType::Type(_, ty) => match ty.as_ref() { + Type::ImplTrait(impl_trait) => match impl_trait.bounds.first().unwrap() { + syn::TypeParamBound::Trait(quoted_path) => { + match "ed_path.path.segments[0].arguments { + syn::PathArguments::AngleBracketed(args) => { + match args.args.first().unwrap() { + syn::GenericArgument::Type(ty) => { + let mut visitor = FreeVariableVisitor::default(); + + visitor.visit_generics(input_generics); + visitor.visit_type(ty); + + let mut out = vec![]; + + visitor.free_variables.iter().for_each(|i| { + let mut i_unspanned = i.clone(); + i_unspanned.set_span(Span::call_site()); + out.push(quote! { + if let Some(prelude) = ::#root::runtime_support::FreeVariable::to_tokens(&#i_unspanned).0 { + runtime_data_prelude.push(prelude); + } + }); + }); + + (out, ty.clone()) + } + _ => panic!(), + } + } + _ => panic!(), + } + } + _ => panic!(), + }, + _ => panic!(), + }, + _ => panic!(), + }; + + let orig_visibility = input.vis.clone(); + + let input_contents = input + .block + .to_token_stream() + .to_string() + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let input_hash = "macro_".to_string() + &sha256::digest(&input_contents); + let input_hash_ident = syn::Ident::new(&input_hash, Span::call_site()); + + proc_macro::TokenStream::from(quote_spanned! {input.span()=> + #[cfg(not(feature = "final"))] + #orig_visibility fn #input_name(input: #root::internal::TokenStream) -> #root::internal::TokenStream { + #[allow(unused)] + let input_parsed = #root::internal::syn::parse::Parser::parse( + #root::internal::syn::punctuated::Punctuated::<#root::internal::syn::Expr, #root::internal::syn::Token![,]>::parse_terminated, + input.into() + ).unwrap(); + + if input_parsed.len() != #expected_arg_count { + panic!("Expected {} arguments, got {}", #expected_arg_count, input_parsed.len()); + } + + #[allow(unused_mut)] + let mut runtime_data_prelude = ::std::vec::Vec::<#root::internal::TokenStream>::new(); + + #(#param_parsing)* + #(#return_type_free)* + + #input + let output_core = { + let graph = #root::QuotedContext::create(); + #root::Quoted::splice(#input_name #passed_generics(&graph, #(#params_to_pass),*)) + }; + + let final_crate = #root::runtime_support::CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate_path: #root::internal::syn::Path = #root::internal::syn::parse_str(final_crate).unwrap(); + + let module_path: #root::internal::syn::Path = #root::internal::syn::parse_str(module_path!()).unwrap(); + let mut module_path = module_path.segments.iter().skip(1).cloned().collect::>(); + module_path.insert(0, #root::internal::syn::PathSegment { + ident: #root::internal::syn::Ident::new("__flow", #root::internal::proc_macro2::Span::call_site()), + arguments: #root::internal::syn::PathArguments::None, + }); + let module_path = #root::internal::syn::Path { + leading_colon: None, + segments: #root::internal::syn::punctuated::Punctuated::from_iter(module_path.into_iter()), + }; + + ::#root::internal::quote!({ + use #pound final_crate_path :: #pound module_path :: *; + #pound (#pound runtime_data_prelude)* + fn create_flow #input_generics( + #(#runtime_data_params),* + ) -> #return_type_inner { + #pound output_core + } + create_flow(#(#runtime_data_args),*) + }) + } + + #[cfg(feature = "final")] + pub use crate::__macro::#input_hash_ident as #input_name; + }) +} diff --git a/stageleft_tool/Cargo.toml b/stageleft_tool/Cargo.toml new file mode 100644 index 000000000000..cf59f117effd --- /dev/null +++ b/stageleft_tool/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "stageleft_tool" +publish = true +version = "0.5.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stageleft_macro/" +description = "Helper macros for the stageleft crate" + +[lib] +path = "src/lib.rs" + +[dependencies] +syn-inline-mod = "0.6.0" +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +lazy_static = "1.4.0" +sha256 = "1.4.0" diff --git a/stageleft_tool/src/lib.rs b/stageleft_tool/src/lib.rs new file mode 100644 index 000000000000..10d783463d7f --- /dev/null +++ b/stageleft_tool/src/lib.rs @@ -0,0 +1,202 @@ +use std::path::Path; +use std::{env, fs}; + +use proc_macro2::Span; +use quote::ToTokens; +use syn::parse_quote; +use syn::visit_mut::VisitMut; + +struct GenMacroVistor { + exported_macros: Vec<(String, syn::Path)>, + current_mod: syn::Path, +} + +// marks everything as pub(crate) because proc-macros cannot actually export anything +impl VisitMut for GenMacroVistor { + fn visit_item_enum_mut(&mut self, i: &mut syn::ItemEnum) { + if matches!(i.vis, syn::Visibility::Public(_)) { + i.vis = parse_quote!(pub(crate)); + } + } + + fn visit_item_mod_mut(&mut self, i: &mut syn::ItemMod) { + let old_mod = self.current_mod.clone(); + let i_ident = &i.ident; + self.current_mod = parse_quote!(#old_mod::#i_ident); + + syn::visit_mut::visit_item_mod_mut(self, i); + + self.current_mod = old_mod; + } + + fn visit_item_fn_mut(&mut self, i: &mut syn::ItemFn) { + let is_entry = i + .attrs + .iter() + .any(|a| a.path().to_token_stream().to_string() == "stageleft :: entry"); + + if is_entry { + let cur_path = &self.current_mod; + let i_name = &i.sig.ident; + let contents = i + .block + .to_token_stream() + .to_string() + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let contents_hash = sha256::digest(contents); + self.exported_macros + .push((contents_hash, parse_quote!(#cur_path::#i_name))); + } + + if matches!(i.vis, syn::Visibility::Public(_)) { + i.vis = parse_quote!(pub(crate)); + } + } + + fn visit_item_mut(&mut self, i: &mut syn::Item) { + syn::visit_mut::visit_item_mut(self, i); + } +} + +pub fn gen_macro(flow_path: &Path, final_crate: &str) { + let out_dir = env::var_os("OUT_DIR").unwrap(); + let dest_path = Path::new(&out_dir).join("lib.rs"); + + let mut flow_lib = + syn_inline_mod::parse_and_inline_modules(&flow_path.join("src").join("lib.rs")); + let mut visitor = GenMacroVistor { + exported_macros: vec![], + current_mod: parse_quote!(crate::__flow), + }; + visitor.visit_file_mut(&mut flow_lib); + + let mut out_file: syn::File = parse_quote!(); + + for (hash, exported) in visitor.exported_macros { + let underscored_path = syn::Ident::new(&("macro_".to_string() + &hash), Span::call_site()); + + let proc_macro_wrapper: syn::ItemFn = parse_quote!( + #[proc_macro] + #[allow(non_snake_case)] + pub fn #underscored_path(input: ::proc_macro::TokenStream) -> ::proc_macro::TokenStream { + let input = ::stageleft::internal::TokenStream::from(input); + let out = ::stageleft::runtime_support::CURRENT_FINAL_CRATE.with(|f| { + let mut f = f.borrow_mut(); + *f = Some(#final_crate); + drop(f); + #exported(input) + }); + ::proc_macro::TokenStream::from(out) + } + ); + + out_file.items.push(syn::Item::Fn(proc_macro_wrapper)); + } + + fs::write(dest_path, out_file.to_token_stream().to_string()).unwrap(); + println!("cargo:rerun-if-changed=build.rs"); + + let flow_path_absolute = fs::canonicalize(flow_path).unwrap(); + println!( + "cargo:rerun-if-changed={}", + flow_path_absolute.to_string_lossy() + ); +} + +struct GenFinalPubVistor { + current_mod: syn::Path, +} + +impl VisitMut for GenFinalPubVistor { + fn visit_item_enum_mut(&mut self, i: &mut syn::ItemEnum) { + i.vis = parse_quote!(pub); + } + + fn visit_item_use_mut(&mut self, i: &mut syn::ItemUse) { + i.vis = parse_quote!(pub); + } + + fn visit_item_mod_mut(&mut self, i: &mut syn::ItemMod) { + let old_mod = self.current_mod.clone(); + let i_ident = &i.ident; + self.current_mod = parse_quote!(#old_mod::#i_ident); + + i.vis = parse_quote!(pub); + + syn::visit_mut::visit_item_mod_mut(self, i); + + self.current_mod = old_mod; + } + + fn visit_item_fn_mut(&mut self, i: &mut syn::ItemFn) { + let is_entry = i + .attrs + .iter() + .any(|a| a.path().to_token_stream().to_string() == "stageleft :: entry"); + + if is_entry { + *i = parse_quote! { + #[cfg(not(feature = "final"))] + #i + } + } + + syn::visit_mut::visit_item_fn_mut(self, i); + } + + fn visit_item_mut(&mut self, i: &mut syn::Item) { + if let syn::Item::Enum(e) = i { + if matches!(e.vis, syn::Visibility::Public(_)) { + let cur_path = &self.current_mod; + let e_name = &e.ident; + *i = parse_quote!(pub use #cur_path::#e_name;); + } + } + + syn::visit_mut::visit_item_mut(self, i); + } + + fn visit_file_mut(&mut self, i: &mut syn::File) { + i.items.retain(|i| match i { + syn::Item::Mod(m) => { + let final_attr: syn::Attribute = parse_quote!(#[cfg(feature = "final")]); + m.attrs.first().map(|d| d != &final_attr).unwrap_or(true) + } + _ => true, + }); + + syn::visit_mut::visit_file_mut(self, i); + } +} + +pub fn gen_final(flow_path: &Path) { + let out_dir = env::var_os("OUT_DIR").unwrap(); + + let mut flow_lib_pub = + syn_inline_mod::parse_and_inline_modules(&flow_path.join("src").join("lib.rs")); + + let mut final_pub_visitor = GenFinalPubVistor { + current_mod: parse_quote!(crate), + }; + final_pub_visitor.visit_file_mut(&mut flow_lib_pub); + + flow_lib_pub + .attrs + .retain(|i| i != &parse_quote!(#![cfg_attr(feature = "final", allow(unused))])); + + fs::write( + Path::new(&out_dir).join("lib_pub.rs"), + flow_lib_pub.to_token_stream().to_string(), + ) + .unwrap(); + + println!("cargo:rerun-if-changed=build.rs"); + + let flow_path_absolute = fs::canonicalize(flow_path).unwrap(); + println!( + "cargo:rerun-if-changed={}", + flow_path_absolute.to_string_lossy() + ); +} diff --git a/topolotree/.gitignore b/topolotree/.gitignore new file mode 100644 index 000000000000..29a7166e8eed --- /dev/null +++ b/topolotree/.gitignore @@ -0,0 +1,2 @@ +*.csv + diff --git a/website_playground/src/lib.rs b/website_playground/src/lib.rs index ad1d8b65fda4..761f6cd7dff2 100644 --- a/website_playground/src/lib.rs +++ b/website_playground/src/lib.rs @@ -197,13 +197,13 @@ pub fn compile_datalog(program: String) -> JsValue { serde_wasm_bindgen::to_value(&out).unwrap() } -struct HydroflowInstance { - hydroflow: Hydroflow, +struct HydroflowInstance<'a, In, Out> { + hydroflow: Hydroflow<'a>, input: tokio::sync::mpsc::UnboundedSender, output: tokio::sync::mpsc::UnboundedReceiver, } -type DatalogBooleanDemoInstance = HydroflowInstance<(i32,), (i32,)>; +type DatalogBooleanDemoInstance = HydroflowInstance<'static, (i32,), (i32,)>; thread_local! { static DATALOG_BOOLEAN_DEMO_INSTANCES: RefCell> =