diff --git a/Cargo.lock b/Cargo.lock index d25a2e6033b7..f0eec3176133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3152,6 +3152,22 @@ dependencies = [ "winnow", ] +[[package]] +name = "topolotree" +version = "0.0.0" +dependencies = [ + "dashmap", + "futures", + "hydroflow", + "hydroflow_datalog", + "procinfo", + "rand 0.8.5", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", +] + [[package]] name = "tracing" version = "0.1.37" diff --git a/Cargo.toml b/Cargo.toml index 7bc13064ba02..a5b428dd3281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "multiplatform_test", "pusherator", "relalg", + "topolotree", "variadics", "website_playground", ] diff --git a/topolotree/Cargo.toml b/topolotree/Cargo.toml new file mode 100644 index 000000000000..d2df597d8778 --- /dev/null +++ b/topolotree/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "topolotree" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } +hydroflow_datalog = { path = "../hydroflow_datalog" } + +tokio = { version = "1.16", features = [ "full" ] } +serde = { version = "1", features = ["rc"] } +serde_json = "1" +rand = "0.8.5" +dashmap = "5.4.0" + +futures = "0.3.28" + +tokio-tungstenite = "0.19.0" + +[target.'cfg(target_os = "linux")'.dependencies] +procinfo = "0.4.2" diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs new file mode 100644 index 000000000000..173492098689 --- /dev/null +++ b/topolotree/src/main.rs @@ -0,0 +1,254 @@ +#[cfg(test)] +mod tests; + +use std::cell::RefCell; +use std::fmt::Debug; +use std::io; +use std::rc::Rc; + +use futures::{SinkExt, Stream}; +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] +pub struct Payload { + timestamp: isize, + data: T, +} + +impl PartialEq for Payload { + fn eq(&self, other: &Self) -> bool { + if self.timestamp == other.timestamp { + assert_eq!(self.data, other.data); + true + } else { + false + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct OperationPayload { + change: i64, +} + +fn run_topolotree( + neighbors: Vec, + input_recv: impl Stream> + Unpin + 'static, + local_update_recv: impl Stream> + Unpin + 'static, + output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>, +) -> Hydroflow { + fn merge(x: &mut i64, y: i64) { + *x += y; + } + + // Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it + // but it would require at least one more join and one more cross join just specifically for the local timestamps + // Until we need it to be proper then we can take a shortcut and use rc refcell + let self_timestamp = Rc::new(RefCell::new(0)); + + let self_timestamp1 = Rc::clone(&self_timestamp); + let self_timestamp2 = Rc::clone(&self_timestamp); + let self_timestamp3 = Rc::clone(&self_timestamp); + + hydroflow_syntax! { + from_neighbors = source_stream(input_recv) + -> map(Result::unwrap) + -> map(|(src, payload)| (src, serde_json::from_slice(&payload[..]).unwrap())) + -> inspect(|(src, payload): &(u32, Payload)| println!("received from: {src}: payload: {payload:?}")) + -> tee(); + + from_neighbors + -> persist() + -> fold_keyed(|| Payload { timestamp: -1, data: Default::default() }, |acc: &mut Payload, val: Payload| { + if val.timestamp > acc.timestamp { + *acc = val; + *self_timestamp1.borrow_mut() += 1; + } + }) + -> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}")) + -> [0]all_neighbor_data; + + local_value = source_stream(local_update_recv) + -> map(Result::unwrap) + -> map(|change_payload: BytesMut| (serde_json::from_slice(&change_payload[..]).unwrap())) + -> inspect(|change_payload: &OperationPayload| println!("change: {change_payload:?}")) + -> inspect(|_| { + *self_timestamp2.borrow_mut() += 1; + }) + -> persist() + -> fold(0, |agg: &mut i64, op: OperationPayload| *agg += op.change); + + neighbors = source_iter(neighbors) + -> persist() + -> tee(); + + // [1, 2, 3] + SelfState + // message comes in from 2 + // (2+3+SelfState) -> 1, (1+2+SelfState) -> 3 + + from_neighbors // 2 comes out here + -> map(|(src, _payload)| src) + -> [0]all_other_neighbors_except_for_who_it_came_from; // 2 goes into this crossjoin + + neighbors + -> [1]all_other_neighbors_except_for_who_it_came_from; + + // (2, 1), (2, 2), (2, 3) + all_other_neighbors_except_for_who_it_came_from = cross_join_multiset() + -> filter(|(src, neighbor)| { + src != neighbor + }) + -> [0]who_to_aggregate_from_by_target; // (2, 1), (2, 3) + + neighbors + -> [1]who_to_aggregate_from_by_target; + + // ((2, 1), 1)), ((2, 1), 2)), ((2, 1), 3)), + // ((2, 3), 1)), ((2, 3), 2)), ((2, 3), 3)), + who_to_aggregate_from_by_target = cross_join_multiset() + -> filter(|((_original_src, target_neighbor), aggregate_from_this_guy)| { + target_neighbor != aggregate_from_this_guy + }) + // ((2, 1), 2)), ((2, 1), 3)), + // ((2, 3), 1)), ((2, 3), 2)), + -> map(|((original_src, target_neighbor), aggregate_from_this_guy)| { + (aggregate_from_this_guy, (original_src, target_neighbor)) + }) + // (2, (2, 1))), (3, (2, 1))), + // (1, (2, 3))), (2, (2, 3))), + -> [1]all_neighbor_data; + + all_neighbor_data = join() + -> map(|(aggregate_from_this_guy, (payload, (original_src, target_neighbor)))| { + ((target_neighbor, original_src), (aggregate_from_this_guy, payload)) + }) + -> fold_keyed(|| 0, |acc: &mut i64, (_aggregate_from_this_guy, payload): (u32, Payload)| { + merge(acc, payload.data); + }) + -> [0]add_local_value; + + local_value + -> [1]add_local_value; + + add_local_value = cross_join_multiset() + -> map(|(((target_neighbor, _original_src), data), local_value)| { + (target_neighbor, Payload { + timestamp: *self_timestamp3.borrow(), + data: data + local_value + }) + }) + -> for_each(|(target_neighbor, output)| { + let serialized = BytesMut::from(serde_json::to_string(&output).unwrap().as_str()).freeze(); + output_send.send((target_neighbor, serialized)).unwrap(); + }); + + // src + // -> map(|(from, _data)| from) + // -> enumerate() + // -> [0]cj1; + + // source_iter(NEIGHBORS) + // -> persist() + // -> [1]cj1; + + // cj1 = cross_join::() + // -> filter(|((_req_id, from), to)| to != from) + // -> map(|((req_id, _from), to)| (to, req_id)) + // -> [0]cj2; + + // source_iter(NEIGHBORS) + // -> persist() + // -> [1]cj2; + + // cj2 = cross_join::() + // -> filter(|((to, _req_id), node_id)| node_id != to) + // -> map(|((to, req_id), node_id)| (node_id, (req_id, to))) + // -> [0]j; + + + + // all_neighbor_data -> neighbors_and_myself; + // operations_input -> fold::<'static>(0, |agg: &mut i64, op: i64| *agg += op) -> map(|total| (my_id, total)) -> neighbors_and_myself; + // neighbors_and_myself = union(); + + // // Cross Join + // neighbors = source_iter(neighbors) -> persist(); + // neighbors_and_myself -> [0]aggregated_data; + // neighbors -> [1]aggregated_data; + + // // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function + // aggregated_data = cross_join_multiset() + // -> filter(|((src, (payload, tick)), dst)| src != dst) + // -> map(|((src, (payload, tick)), dst)| (dst, payload)); + + // aggregated_data + // -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze())) + // -> for_each(|x| { + // output_send.send(x).unwrap(); + // }); + + } +} + +#[hydroflow::main] +async fn main() { + let args: Vec = std::env::args().skip(1).collect(); + let neighbors: Vec = args.into_iter().map(|x| x.parse().unwrap()).collect(); + // let current_id = neighbors[0]; + + let mut ports = hydroflow::util::cli::init().await; + + let input_recv = ports + .port("input") + // connect to the port with a single recipient + .connect::>() + .await + .into_source(); + + let mut output_send = ports + .port("output") + .connect::>() + .await + .into_sink(); + + let operations_send = ports + .port("input") + // connect to the port with a single recipient + .connect::() + .await + .into_source(); + + let _increment_requests = ports + .port("increment_requests") + .connect::() + .await + .into_source(); + + let _query_responses = ports + .port("query_responses") + .connect::() + .await + .into_sink(); + + let (chan_tx, mut chan_rx) = tokio::sync::mpsc::unbounded_channel(); + + tokio::task::spawn_local(async move { + while let Some(msg) = chan_rx.recv().await { + output_send.send(msg).await.unwrap(); + } + }); + + hydroflow::util::cli::launch_flow(run_topolotree( + neighbors, + input_recv, + operations_send, + chan_tx, + )) + .await; +} diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs new file mode 100644 index 000000000000..568c10b6ce71 --- /dev/null +++ b/topolotree/src/tests.rs @@ -0,0 +1,191 @@ +use std::fmt::Debug; +use std::io; + +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::util::multiset::HashMultiSet; +use hydroflow::util::{collect_ready_async, unbounded_channel}; +use serde::Serialize; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; + +use crate::{run_topolotree, OperationPayload, Payload}; + +pub fn simulate_input( + input_send: &mut UnboundedSender>, + (id, payload): (u32, Payload), +) -> Result<(), SendError>> { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) +} + +pub fn simulate_operation( + input_send: &mut UnboundedSender>, + payload: OperationPayload, +) -> Result<(), SendError>> { + input_send.send(Ok(BytesMut::from( + serde_json::to_string(&payload).unwrap().as_str(), + ))) +} + +pub async fn read_all( + mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, +) -> HashMultiSet<(u32, Payload)> { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() +} + +#[hydroflow::test] +async fn simple_payload_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); +} + +#[hydroflow::test] +async fn idempotence_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); +} + +#[hydroflow::test] +async fn backwards_in_time_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 7 }), + (3, Payload { timestamp: 1, data: 7 }), + ])); +} + +#[hydroflow::test] +async fn multiple_input_sources_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, Payload { timestamp: 2, data: 2 }), + (2, Payload { timestamp: 2, data: 7 }), + (3, Payload { timestamp: 2, data: 9 }), + (3, Payload { timestamp: 2, data: 9 }), + ])); +} + +#[hydroflow::test] +async fn simple_operation_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); + + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 3, data: 14 }), + (3, Payload { timestamp: 3, data: 14 }), + ])); +} + +// idempotence test (issue two requests with the same timestamp and see that they don't change anything.) +// let input1 = (1, Payload {timestamp:4, data:2}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); +// +// backward in time test (issue two requests, the second one with an earlier timestamp than the first. ) +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); +// +// updates from multiple sources test +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (2, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); +// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); +// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9});