From c3fde3f8fab37e015ead1c151eef646f3ae2dea2 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 26 Dec 2023 15:43:41 -0800 Subject: [PATCH] Update dependencies and improve initial example --- Cargo.lock | 177 ++++++++++++++++++------------------ flow/Cargo.toml | 1 + flow/src/bin/my_dataflow.rs | 7 +- flow/src/lib.rs | 123 ++++++++++++++----------- 4 files changed, 162 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5910e5a..dd7c59e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,9 +85,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "c9d19de80eff169429ac1e9f48fffb163916b448a44e8e046186232046d9e1f9" dependencies = [ "backtrace", ] @@ -116,7 +116,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" dependencies = [ "concurrent-queue", - "event-listener 4.0.0", + "event-listener 4.0.1", "event-listener-strategy", "futures-core", "pin-project-lite", @@ -176,7 +176,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" dependencies = [ - "event-listener 4.0.0", + "event-listener 4.0.1", "event-listener-strategy", "pin-project-lite", ] @@ -212,7 +212,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -254,13 +254,13 @@ checksum = "e1d90cd0b264dfdd8eb5bad0a2c217c1f88fa96a8573f40e7b12de23fb468f46" [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -508,7 +508,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -582,9 +582,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] @@ -694,9 +694,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "4.0.0" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae" +checksum = "84f2cdcf274580f2d63697192d744727b3198894b1bf02923643bf59e2c26712" dependencies = [ "concurrent-queue", "parking", @@ -709,7 +709,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" dependencies = [ - "event-listener 4.0.0", + "event-listener 4.0.1", "pin-project-lite", ] @@ -732,6 +732,7 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" name = "flow" version = "0.0.0" dependencies = [ + "async-ssh2-lite", "flow_macro", "hydro_deploy", "hydroflow_plus", @@ -763,9 +764,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -778,9 +779,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -788,15 +789,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -805,9 +806,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-lite" @@ -836,32 +837,32 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -976,7 +977,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hydro_deploy" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -1006,7 +1007,7 @@ dependencies = [ [[package]] name = "hydroflow" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "bincode", "byteorder", @@ -1039,7 +1040,7 @@ dependencies = [ [[package]] name = "hydroflow_cli_integration" version = "0.3.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "async-recursion", "async-trait", @@ -1055,19 +1056,19 @@ dependencies = [ [[package]] name = "hydroflow_datalog" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "hydroflow_datalog_core", "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "hydroflow_datalog_core" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "hydroflow_lang", "proc-macro-crate", @@ -1076,13 +1077,13 @@ dependencies = [ "rust-sitter", "rust-sitter-tool", "slotmap", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "hydroflow_lang" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "auto_impl", "clap 4.4.11", @@ -1095,27 +1096,27 @@ dependencies = [ "serde", "serde_json", "slotmap", - "syn 2.0.41", + "syn 2.0.43", "webbrowser", ] [[package]] name = "hydroflow_macro" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "hydroflow_lang", "itertools", "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "hydroflow_plus" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "bincode", "hydroflow", @@ -1125,22 +1126,21 @@ dependencies = [ "quote", "serde", "stageleft", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "hydroflow_plus_cli_integration" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "async-channel 1.9.0", "hydro_deploy", "hydroflow_plus", "serde", - "serde_json", "stageleft", "stageleft_tool", - "syn 2.0.41", + "syn 2.0.43", "tokio", ] @@ -1259,7 +1259,7 @@ dependencies = [ [[package]] name = "lattices" version = "0.5.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "cc-traits", "sealed", @@ -1452,9 +1452,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -1476,9 +1476,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.97" +version = "0.9.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" +checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" dependencies = [ "cc", "libc", @@ -1564,7 +1564,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -1592,9 +1592,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "polling" @@ -1645,7 +1645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -1684,9 +1684,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] @@ -1694,7 +1694,7 @@ dependencies = [ [[package]] name = "pusherator" version = "0.0.3" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "either", "variadics", @@ -1791,7 +1791,7 @@ checksum = "2566c4bf6845f2c2e83b27043c3f5dfcd5ba8f2937d6c00dc009bfb51a079dc4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -1947,7 +1947,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -1976,7 +1976,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -2096,37 +2096,37 @@ dependencies = [ [[package]] name = "stageleft" version = "0.1.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "proc-macro-crate", "proc-macro2", "quote", "stageleft_macro", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "stageleft_macro" version = "0.1.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "lazy_static", "proc-macro-crate", "proc-macro2", "quote", "sha256", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] name = "stageleft_tool" version = "0.1.0" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" dependencies = [ "proc-macro2", "quote", "sha256", - "syn 2.0.41", + "syn 2.0.43", "syn-inline-mod 0.6.0", ] @@ -2155,9 +2155,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.41" +version = "2.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" +checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" dependencies = [ "proc-macro2", "quote", @@ -2181,7 +2181,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fa6dca1fdb7b2ed46dd534a326725419d4fb10f23d8c85a8b2860e5eb25d0f9" dependencies = [ "proc-macro2", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -2208,22 +2208,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.51" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" +checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.51" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" +checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -2255,9 +2255,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -2280,7 +2280,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -2355,7 +2355,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", ] [[package]] @@ -2537,7 +2537,10 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "variadics" version = "0.0.2" -source = "git+https://github.com/hydro-project/hydroflow.git#38411ea007d4feb30dd16bdd1505802a111a67d1" +source = "git+https://github.com/hydro-project/hydroflow.git#6af81f734d65d40aa308ea6059618d3fe362a61a" +dependencies = [ + "sealed", +] [[package]] name = "vcpkg" @@ -2600,7 +2603,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", "wasm-bindgen-shared", ] @@ -2622,7 +2625,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.43", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2903,9 +2906,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.28" +version = "0.5.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c830786f7720c2fd27a1a0e27a709dbd3c4d009b56d098fc742d4f4eab91fe2" +checksum = "9b5c3db89721d50d0e2a673f5043fc4722f76dcc352d7b1ab8b8288bed4ed2c5" dependencies = [ "memchr", ] diff --git a/flow/Cargo.toml b/flow/Cargo.toml index 6b7d702..9534b5d 100644 --- a/flow/Cargo.toml +++ b/flow/Cargo.toml @@ -18,3 +18,4 @@ stageleft_tool = { git = "https://github.com/hydro-project/hydroflow.git" } [dev-dependencies] hydro_deploy = { git = "https://github.com/hydro-project/hydroflow.git" } hydroflow_plus_cli_integration = { git = "https://github.com/hydro-project/hydroflow.git", features = [ "deploy" ] } +async-ssh2-lite = { version = "0.4.2", features = [ "vendored-openssl" ] } diff --git a/flow/src/bin/my_dataflow.rs b/flow/src/bin/my_dataflow.rs index 706451a..afc7a49 100644 --- a/flow/src/bin/my_dataflow.rs +++ b/flow/src/bin/my_dataflow.rs @@ -1,10 +1,9 @@ +extern crate alloc; + // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow_plus::util::cli::init().await; - let joined = flow::my_dataflow_runtime!(&ports, node_id); - - hydroflow_plus::util::cli::launch_flow(joined).await; + hydroflow_plus::util::cli::launch_flow(flow::partitioned_char_counter_runtime!(&ports)).await; } diff --git a/flow/src/lib.rs b/flow/src/lib.rs index 47a0fcf..53693d7 100644 --- a/flow/src/lib.rs +++ b/flow/src/lib.rs @@ -1,90 +1,103 @@ stageleft::stageleft_crate!(flow_macro); -use hydroflow_plus::bytes::BytesMut; -use hydroflow_plus::node::{HfNetworkedDeploy, HfNode, HfNodeBuilder}; +use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder, ClusterBuilder, HfCluster}; +use hydroflow_plus::GraphBuilder; +use stageleft::{q, Quoted, RuntimeData}; + use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::util::cli::HydroCLI; -use hydroflow_plus::HfBuilder; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; -use stageleft::{q, Quoted, RuntimeData}; -pub fn my_dataflow<'a, D: HfNetworkedDeploy<'a>>( - graph: &'a HfBuilder<'a, D>, - node_builder: &impl HfNodeBuilder<'a, D>, -) -> (D::NodePort, D::Node, D::Node) { - let node_zero = graph.node(node_builder); - let node_one = graph.node(node_builder); - - let (source_zero_port, source_zero) = node_zero.source_external(); - - source_zero - .map(q!(|v| v.unwrap().freeze())) - .send_bytes(&node_one) - .for_each(q!(|v: Result| { - println!( - "node one received: {:?}", - std::str::from_utf8(&v.unwrap()).unwrap() - ); - })); - - (source_zero_port, node_zero, node_one) +pub fn partitioned_char_counter<'a, D: Deploy<'a>>( + graph: &'a GraphBuilder<'a, D>, + node_builder: &impl NodeBuilder<'a, D>, + cluster_builder: &impl ClusterBuilder<'a, D>, +) -> (D::Node, D::Cluster) { + let node = graph.node(node_builder); + let cluster = graph.cluster(cluster_builder); + + let words = node + .source_iter(q!(vec!["abc", "abc", "xyz"])) + .map(q!(|s| s.to_string())); + + let all_ids_vec = cluster.ids(); + let words_partitioned = words.enumerate().map(q!({ + let cluster_size = all_ids_vec.len(); + move |(i, w)| ((i % cluster_size) as u32, w) + })); + + words_partitioned + .demux_bincode(&cluster) + .batched() + .fold(q!(|| 0), q!(|count, string: String| *count += string.len())) + .inspect(q!(|count| println!("partition count: {}", count))) + .send_bincode_tagged(&node) + .persist() + .map(q!(|(_mid, count)| count)) + .fold(q!(|| 0), q!(|total, count| *total += count)) + .for_each(q!(|data| println!("total: {}", data))); + + (node, cluster) } #[stageleft::entry] -pub fn my_dataflow_runtime<'a>( - graph: &'a HfBuilder<'a, CLIRuntime>, +pub fn partitioned_char_counter_runtime<'a>( + graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let _ = my_dataflow(graph, &cli); - graph.build(node_id) + let _ = partitioned_char_counter(graph, &cli, &cli); + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::runtime] #[cfg(test)] mod tests { - use std::time::Duration; - use std::vec; + use std::cell::RefCell; use hydro_deploy::{Deployment, HydroflowCrate}; - use hydroflow_plus::futures::SinkExt; - use hydroflow_plus::util::cli::ConnectedSink; - use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper}; + use hydroflow_plus::futures::StreamExt; + use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper, CLIDeployClusterBuilder}; #[tokio::test] - async fn networked_basic() { + async fn partitioned_char_counter() { let mut deployment = Deployment::new(); let localhost = deployment.Localhost(); - let builder = hydroflow_plus::HfBuilder::new(); - let (source_zero_port, _, node_one) = super::my_dataflow( + let deployment_cell = RefCell::new(deployment); + let builder = hydroflow_plus::GraphBuilder::new(); + let (leader, _) = super::partitioned_char_counter( &builder, - &mut CLIDeployNodeBuilder::new(|id| { - deployment.add_service(HydroflowCrate::new( - ".", localhost.clone() - ).bin("my_dataflow").profile("dev").args(vec![id.to_string()])) + &CLIDeployNodeBuilder::new(|| { + deployment_cell.borrow_mut().add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("my_dataflow") + .profile("dev"), + ) + }), + &CLIDeployClusterBuilder::new(|| { + (0..2).map(|_| { + deployment_cell.borrow_mut().add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("my_dataflow") + .profile("dev"), + ) + }).collect() }), ); - let port_to_zero = source_zero_port - .create_sender(&mut deployment, &localhost) - .await; - + let mut deployment = deployment_cell.into_inner(); deployment.deploy().await.unwrap(); - let mut conn_to_zero = port_to_zero.connect().await.into_sink(); - let node_one_stdout = node_one.stdout().await; + let mut leader_stdout = leader.stdout().await; deployment.start().await.unwrap(); - conn_to_zero.send("hello world!".into()).await.unwrap(); + while let Some(line) = leader_stdout.next().await { + if line == "total: 9" { + return; + } + } - assert_eq!( - tokio::time::timeout(Duration::from_secs(1), node_one_stdout.recv()) - .await - .unwrap() - .unwrap(), - "node one received: \"hello world!\"" - ); + panic!("did not find total: 9"); } }