Skip to content

Commit

Permalink
refactor: extract some test code into functions for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 26, 2024
1 parent 346b14c commit acd5cc7
Showing 1 changed file with 43 additions and 40 deletions.
83 changes: 43 additions & 40 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,43 @@ mod test {
(stream_1, stream_2, to_insert)
}

/// Asserts the events are deliverable and returns IDs for events in stream_1 as the first value and things in stream_2 as the second
fn split_deliverable_order_by_stream(
stream_1: &[(EventId, Vec<u8>)],
stream_2: &[(EventId, Vec<u8>)],
events: &[(EventInsertable, EventMetadata)],
) -> (Vec<EventId>, Vec<EventId>) {
let mut after_1 = Vec::with_capacity(stream_1.len());
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}

(after_1, after_2)
}

async fn get_insertable_events(
events: &[(EventId, Vec<u8>)],
) -> Vec<(EventInsertable, EventMetadata)> {
let mut insertable = Vec::with_capacity(events.len());
for event in events {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}

insertable
}

#[test(tokio::test)]
async fn out_of_order_streams_valid() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
Expand All @@ -177,16 +214,8 @@ mod test {
ordered.missing_history.len(),
ordered.missing_history
);
let mut after_1 = Vec::with_capacity(10);
let mut after_2 = Vec::with_capacity(10);
for (event, _) in ordered.deliverable {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}
let (after_1, after_2) =
split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable());

assert_eq!(
stream_1.into_iter().map(|e| e.0).collect::<Vec<_>>(),
Expand Down Expand Up @@ -214,16 +243,8 @@ mod test {
"Missing history: {:?}",
ordered.missing_history
);
let mut after_1 = Vec::with_capacity(10);
let mut after_2 = Vec::with_capacity(10);
for (event, _) in ordered.deliverable {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}
let (after_1, after_2) =
split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable());

assert_eq!(vec![stream_1[0].0.clone()], after_1);
assert_eq!(
Expand All @@ -240,16 +261,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let mut insertable = Vec::with_capacity(10);
for event in stream_1.iter() {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}
let insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter()
.take(3)
Expand Down Expand Up @@ -278,16 +290,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let mut insertable = Vec::with_capacity(10);
for event in stream_1.iter() {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}
let mut insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter_mut()
.take(3)
Expand Down

0 comments on commit acd5cc7

Please sign in to comment.