diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 8a30abbab406..fd62a4f4a76c 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -10,6 +10,7 @@ use futures::{Sink, SinkExt, Stream}; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow::util::cli::{ ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, }; @@ -19,7 +20,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; -use crate::util::simulate_input; +use crate::util::{read_all, simulate_input, simulate_operation}; #[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] pub struct Payload { @@ -39,7 +40,7 @@ impl PartialEq for Payload { } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -struct OperationPayload { +pub struct OperationPayload { change: i64, } @@ -275,23 +276,12 @@ async fn simple_payload_test() { #[rustfmt::skip] simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 2 }), (3, Payload { timestamp: 1, data: 2 }), ])); @@ -311,23 +301,12 @@ async fn idempotence_test() { simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 2 }), (3, Payload { timestamp: 1, data: 2 }), ])); @@ -347,23 +326,12 @@ async fn backwards_in_time_test() { simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 7 }), (3, Payload { timestamp: 1, data: 7 }), ])); @@ -383,23 +351,12 @@ async fn multiple_input_sources_test() { simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (1, Payload { timestamp: 2, data: 2 }), (2, Payload { timestamp: 2, data: 7 }), (3, Payload { timestamp: 2, data: 9 }), @@ -411,46 +368,24 @@ async fn multiple_input_sources_test() { async fn simple_operation_test() { let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); + let (mut operations_tx, operations_rx) = unbounded_channel::>(); let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); #[rustfmt::skip] - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + { + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); - operations_tx - .send(Ok(BytesMut::from( - serde_json::to_string(&OperationPayload { change: 5 }) - .unwrap() - .as_str(), - ))) - .unwrap(); - - operations_tx - .send(Ok(BytesMut::from( - serde_json::to_string(&OperationPayload { change: 7 }) - .unwrap() - .as_str(), - ))) - .unwrap(); - - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 3, data: 14 }), (3, Payload { timestamp: 3, data: 14 }), ])); diff --git a/topolotree/src/util.rs b/topolotree/src/util.rs index 4eb53f48d5c9..a15cac270188 100644 --- a/topolotree/src/util.rs +++ b/topolotree/src/util.rs @@ -1,11 +1,14 @@ use std::fmt::Debug; -use hydroflow::bytes::BytesMut; +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::util::collect_ready_async; +use hydroflow::util::multiset::HashMultiSet; use serde::Serialize; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; -use crate::Payload; +use crate::{OperationPayload, Payload}; pub fn simulate_input( input_send: &mut UnboundedSender>, @@ -16,3 +19,27 @@ pub fn simulate_input( BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), ))) } + +pub fn simulate_operation( + input_send: &mut UnboundedSender>, + payload: OperationPayload, +) -> Result<(), SendError>> { + input_send.send(Ok(BytesMut::from( + serde_json::to_string(&payload).unwrap().as_str(), + ))) +} + +pub async fn read_all( + mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, +) -> HashMultiSet<(u32, Payload)> { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() +}