Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zzlk committed Sep 15, 2023
1 parent cf8769d commit 0b43856
Showing 1 changed file with 32 additions and 76 deletions.
108 changes: 32 additions & 76 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,15 @@ async fn simple_payload_test() {
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<Vec<_>>()
.collect::<HashMultiSet<_>>()
};
flow.run_tick();
let output1: (u32, Payload<i64>) = (
2,
Payload {
timestamp: 1,
data: 2,
},
);
let output2: (u32, Payload<i64>) = (
3,
Payload {
timestamp: 1,
data: 2,
},
);
assert_eq!(receive_all_output().await, &[output1, output2]);

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
Expand Down Expand Up @@ -331,24 +322,15 @@ async fn idempotence_test() {
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<Vec<_>>()
.collect::<HashMultiSet<_>>()
};
flow.run_tick();
let output1: (u32, Payload<i64>) = (
2,
Payload {
timestamp: 1,
data: 2,
},
);
let output2: (u32, Payload<i64>) = (
3,
Payload {
timestamp: 1,
data: 2,
},
);
assert_eq!(receive_all_output().await, &[output1, output2]);

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 2 }),
(3, Payload { timestamp: 1, data: 2 }),
]));
}

#[hydroflow::test]
Expand Down Expand Up @@ -376,24 +358,15 @@ async fn backwards_in_time_test() {
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<Vec<_>>()
.collect::<HashMultiSet<_>>()
};
flow.run_tick();
let output1: (u32, Payload<i64>) = (
2,
Payload {
timestamp: 1,
data: 7,
},
);
let output2: (u32, Payload<i64>) = (
3,
Payload {
timestamp: 1,
data: 7,
},
);
assert_eq!(receive_all_output().await, &[output1, output2]);

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
(2, Payload { timestamp: 1, data: 7 }),
(3, Payload { timestamp: 1, data: 7 }),
]));
}

#[hydroflow::test]
Expand Down Expand Up @@ -424,31 +397,14 @@ async fn multiple_input_sources_test() {
.collect::<HashMultiSet<_>>()
};
flow.run_tick();
let output1 = (
1,
Payload {
timestamp: 2,
data: 2,
},
);
let output2 = (
2,
Payload {
timestamp: 2,
data: 7,
},
);
let output3 = (
3,
Payload {
timestamp: 2,
data: 9,
},
);
assert_eq!(
receive_all_output().await,
HashMultiSet::from_iter([output1, output2, output3.clone(), output3])
);

#[rustfmt::skip]
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
(1, Payload { timestamp: 2, data: 2 }),
(2, Payload { timestamp: 2, data: 7 }),
(3, Payload { timestamp: 2, data: 9 }),
(3, Payload { timestamp: 2, data: 9 }),
]));
}

#[hydroflow::test]
Expand Down Expand Up @@ -489,15 +445,15 @@ async fn simple_operation_test() {
serde_json::from_slice::<Payload<i64>>(&bytes[..]).unwrap(),
)
})
.collect::<Vec<_>>()
.collect::<HashMultiSet<_>>()
};
flow.run_tick();

#[rustfmt::skip]
assert_eq!(receive_all_output().await, &[
assert_eq!(receive_all_output().await, HashMultiSet::from_iter([
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]);
]));
}

// idempotence test (issue two requests with the same timestamp and see that they don't change anything.)
Expand Down

0 comments on commit 0b43856

Please sign in to comment.