Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zzlk committed Sep 15, 2023
1 parent 0b43856 commit 1351753
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 91 deletions.
113 changes: 24 additions & 89 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::{Sink, SinkExt, Stream};
use hydroflow::bytes::{Bytes, BytesMut};
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;

Check failure on line 13 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `hydroflow::tokio_stream::wrappers::UnboundedReceiverStream`

Check warning on line 13 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused import: `hydroflow::tokio_stream::wrappers::UnboundedReceiverStream`

Check warning on line 13 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused import: `hydroflow::tokio_stream::wrappers::UnboundedReceiverStream`
use hydroflow::util::cli::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
};
Expand All @@ -19,7 +20,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::error::SendError;

Check failure on line 20 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `tokio::sync::mpsc::error::SendError`

Check warning on line 20 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused import: `tokio::sync::mpsc::error::SendError`

Check warning on line 20 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused import: `tokio::sync::mpsc::error::SendError`
use tokio::sync::mpsc::UnboundedSender;

Check failure on line 21 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `tokio::sync::mpsc::UnboundedSender`

Check warning on line 21 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused import: `tokio::sync::mpsc::UnboundedSender`

Check warning on line 21 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused import: `tokio::sync::mpsc::UnboundedSender`

use crate::util::simulate_input;
use crate::util::{read_all, simulate_input, simulate_operation};

Check warning on line 23 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused imports: `read_all`, `simulate_input`, `simulate_operation`

#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)]

Check failure on line 25 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

you are deriving `Hash` but have implemented `PartialEq` explicitly
pub struct Payload<T: Debug> {
Expand All @@ -39,7 +40,7 @@ impl<T: PartialEq + Debug> PartialEq for Payload<T> {
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
struct OperationPayload {
pub struct OperationPayload {
change: i64,
}

Expand Down Expand Up @@ -275,23 +276,12 @@ async fn simple_payload_test() {
#[rustfmt::skip]
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();

let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let receive_all_output = || async move {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
};
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
Expand All @@ -311,23 +301,12 @@ async fn idempotence_test() {
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let receive_all_output = || async move {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
};
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
Expand All @@ -347,23 +326,12 @@ async fn backwards_in_time_test() {
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let receive_all_output = || async move {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
};
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 7 }),
(3, Payload { timestamp: 1, data: 7 }),
]));
Expand All @@ -383,23 +351,12 @@ async fn multiple_input_sources_test() {
simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let receive_all_output = || async move {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
};
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(1, Payload { timestamp: 2, data: 2 }),
(2, Payload { timestamp: 2, data: 7 }),
(3, Payload { timestamp: 2, data: 9 }),
Expand All @@ -411,46 +368,24 @@ async fn multiple_input_sources_test() {
async fn simple_operation_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();
{
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap();

operations_tx
.send(Ok(BytesMut::from(
serde_json::to_string(&OperationPayload { change: 5 })
.unwrap()
.as_str(),
)))
.unwrap();

operations_tx
.send(Ok(BytesMut::from(
serde_json::to_string(&OperationPayload { change: 7 })
.unwrap()
.as_str(),
)))
.unwrap();

let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let receive_all_output = || async move {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]));
Expand Down
31 changes: 29 additions & 2 deletions topolotree/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::fmt::Debug;

use hydroflow::bytes::BytesMut;
use hydroflow::bytes::{Bytes, BytesMut};
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow::util::collect_ready_async;
use hydroflow::util::multiset::HashMultiSet;
use serde::Serialize;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;

use crate::Payload;
use crate::{OperationPayload, Payload};

pub fn simulate_input<T: Debug + Serialize>(

Check warning on line 13 in topolotree/src/util.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

function `simulate_input` is never used
input_send: &mut UnboundedSender<Result<(u32, BytesMut), std::io::Error>>,
Expand All @@ -16,3 +19,27 @@ pub fn simulate_input<T: Debug + Serialize>(
BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()),
)))
}

pub fn simulate_operation(
input_send: &mut UnboundedSender<Result<BytesMut, std::io::Error>>,
payload: OperationPayload,
) -> Result<(), SendError<Result<BytesMut, std::io::Error>>> {
input_send.send(Ok(BytesMut::from(
serde_json::to_string(&payload).unwrap().as_str(),
)))
}

pub async fn read_all(
mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>,
) -> HashMultiSet<(u32, Payload<i64>)> {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
}

0 comments on commit 1351753

Please sign in to comment.