Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zzlk committed Sep 15, 2023
1 parent 1351753 commit 6468c84
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 205 deletions.
163 changes: 3 additions & 160 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
mod util;
#[cfg(test)]
mod tests;

use std::cell::RefCell;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use std::rc::Rc;

use futures::{Sink, SinkExt, Stream};
use futures::{SinkExt, Stream};
use hydroflow::bytes::{Bytes, BytesMut};
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow::util::cli::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
};
use hydroflow::util::multiset::HashMultiSet;
use hydroflow::util::{collect_ready_async, unbounded_channel};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;

use crate::util::{read_all, simulate_input, simulate_operation};

#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)]

Check failure on line 18 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

you are deriving `Hash` but have implemented `PartialEq` explicitly
pub struct Payload<T: Debug> {
Expand Down Expand Up @@ -99,7 +92,6 @@ fn run_topolotree(
// message comes in from 2
// (2+3+SelfState) -> 1, (1+2+SelfState) -> 3


from_neighbors // 2 comes out here
-> map(|(src, _payload)| src)
-> [0]all_other_neighbors_except_for_who_it_came_from; // 2 goes into this crossjoin
Expand Down Expand Up @@ -132,8 +124,6 @@ fn run_topolotree(
// (1, (2, 3))), (2, (2, 3))),
-> [1]all_neighbor_data;



all_neighbor_data = join()
-> map(|(aggregate_from_this_guy, (payload, (original_src, target_neighbor)))| {
((target_neighbor, original_src), (aggregate_from_this_guy, payload))
Expand All @@ -158,8 +148,6 @@ fn run_topolotree(
output_send.send((target_neighbor, serialized)).unwrap();
});



// src
// -> map(|(from, _data)| from)
// -> enumerate()
Expand Down Expand Up @@ -264,148 +252,3 @@ async fn main() {
))
.await;
}

#[hydroflow::test]
async fn simple_payload_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
async fn idempotence_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
async fn backwards_in_time_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 7 }),
(3, Payload { timestamp: 1, data: 7 }),
]));
}

#[hydroflow::test]
async fn multiple_input_sources_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(1, Payload { timestamp: 2, data: 2 }),
(2, Payload { timestamp: 2, data: 7 }),
(3, Payload { timestamp: 2, data: 9 }),
(3, Payload { timestamp: 2, data: 9 }),
]));
}

#[hydroflow::test]
async fn simple_operation_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (mut operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap();

};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]));
}

// idempotence test (issue two requests with the same timestamp and see that they don't change anything.)
// let input1 = (1, Payload {timestamp:4, data:2});
// let input2 = (1, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2});
// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2});
//
// backward in time test (issue two requests, the second one with an earlier timestamp than the first. )
// let input1 = (1, Payload {timestamp:5, data:7});
// let input2 = (1, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7});
// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7});
//
// updates from multiple sources test
// let input1 = (1, Payload {timestamp:5, data:7});
// let input2 = (2, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2});
// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7});
// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9});
191 changes: 191 additions & 0 deletions topolotree/src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use std::fmt::Debug;
use std::io;

use hydroflow::bytes::{Bytes, BytesMut};
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow::util::multiset::HashMultiSet;
use hydroflow::util::{collect_ready_async, unbounded_channel};
use serde::Serialize;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;

use crate::{run_topolotree, OperationPayload, Payload};

pub fn simulate_input<T: Debug + Serialize>(
input_send: &mut UnboundedSender<Result<(u32, BytesMut), std::io::Error>>,
(id, payload): (u32, Payload<T>),
) -> Result<(), SendError<Result<(u32, BytesMut), std::io::Error>>> {
input_send.send(Ok((
id,
BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()),
)))
}

pub fn simulate_operation(
input_send: &mut UnboundedSender<Result<BytesMut, std::io::Error>>,
payload: OperationPayload,
) -> Result<(), SendError<Result<BytesMut, std::io::Error>>> {
input_send.send(Ok(BytesMut::from(
serde_json::to_string(&payload).unwrap().as_str(),
)))
}

pub async fn read_all(
mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>,
) -> HashMultiSet<(u32, Payload<i64>)> {
let collected = collect_ready_async::<Vec<_>, _>(&mut output_recv).await;
collected
.iter()
.map(|(id, bytes)| {
(
*id,
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<HashMultiSet<_>>()
}

#[hydroflow::test]
async fn simple_payload_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
async fn idempotence_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
async fn backwards_in_time_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 7 }),
(3, Payload { timestamp: 1, data: 7 }),
]));
}

#[hydroflow::test]
async fn multiple_input_sources_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(1, Payload { timestamp: 2, data: 2 }),
(2, Payload { timestamp: 2, data: 7 }),
(3, Payload { timestamp: 2, data: 9 }),
(3, Payload { timestamp: 2, data: 9 }),
]));
}

#[hydroflow::test]
async fn simple_operation_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (mut operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap();

};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]));
}

// idempotence test (issue two requests with the same timestamp and see that they don't change anything.)
// let input1 = (1, Payload {timestamp:4, data:2});
// let input2 = (1, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2});
// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2});
//
// backward in time test (issue two requests, the second one with an earlier timestamp than the first. )
// let input1 = (1, Payload {timestamp:5, data:7});
// let input2 = (1, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7});
// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7});
//
// updates from multiple sources test
// let input1 = (1, Payload {timestamp:5, data:7});
// let input2 = (2, Payload {timestamp:4, data:2});
// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2});
// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7});
// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9});
Loading

0 comments on commit 6468c84

Please sign in to comment.