diff --git a/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql index 8b33bc8bd..5dc4474d4 100644 --- a/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -24,6 +24,9 @@ create table s3_object ( bucket text not null, -- The key of the object. key text not null, + -- The version id of the object. A 'null' string is used to indicate no version id. This matches logic in AWS which + -- also returns 'null' strings. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/versioning-workflows.html + version_id text not null default 'null', -- When this object was created. A null value here means that a deleted event has occurred before a created event. created_date timestamptz default null, -- When this object was deleted, a null value means that the object has not yet been deleted. @@ -41,8 +44,6 @@ create table s3_object ( e_tag text default null, -- The S3 storage class of the object. storage_class storage_class default null, - -- The version id of the object, if present. - version_id text default null, -- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events. created_sequencer text default null, -- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events. @@ -53,6 +54,6 @@ create table s3_object ( number_duplicate_events integer not null default 0, -- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event. - constraint created_sequencer_unique unique nulls not distinct (bucket, key, version_id, created_sequencer), - constraint deleted_sequencer_unique unique nulls not distinct (bucket, key, version_id, deleted_sequencer) + constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer), + constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer) ); diff --git a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql index 9e52b3569..b613afa63 100644 --- a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql +++ b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql @@ -51,7 +51,7 @@ current_objects as ( join input on input.bucket = s3_object.bucket and input.key = s3_object.key and - input.version_id is not distinct from s3_object.version_id + input.version_id = s3_object.version_id -- Lock this pre-emptively for the update. for update ), @@ -70,12 +70,17 @@ objects_to_update as ( -- If a sequencer already exists this event should be reprocessed because this -- sequencer could belong to another object. current_objects.created_sequencer < current_objects.input_created_sequencer - ) + ) and -- And there should not be any objects with a created sequencer that is the same as the input created -- sequencer because this is a duplicate event that would cause a constraint error in the update. - and current_objects.input_created_sequencer not in ( + current_objects.input_created_sequencer not in ( select created_sequencer from current_objects where created_sequencer is not null ) + -- Only one event entry should be updated, and that entry must be the one with the + -- deleted sequencer that is minimum, i.e. closest to the created sequencer which + -- is going to be inserted. + order by current_objects.deleted_sequencer asc + limit 1 ), -- Finally, update the required objects. update as ( @@ -107,7 +112,7 @@ select last_modified_date, e_tag, storage_class as "storage_class?: StorageClass", - version_id, + version_id as "version_id!", created_sequencer as sequencer, number_reordered, number_duplicate_events, diff --git a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql index 6c36cad39..5926f6b94 100644 --- a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql +++ b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql @@ -36,7 +36,7 @@ current_objects as ( join input on input.bucket = s3_object.bucket and input.key = s3_object.key and - input.version_id is not distinct from s3_object.version_id + input.version_id = s3_object.version_id -- Lock this pre-emptively for the update. for update ), @@ -55,12 +55,17 @@ objects_to_update as ( -- If a sequencer already exists this event should be reprocessed because this -- sequencer would belong to another object. current_objects.deleted_sequencer > current_objects.input_deleted_sequencer - ) + ) and -- And there should not be any objects with a deleted sequencer that is the same as the input deleted -- sequencer because this is a duplicate event that would cause a constraint error in the update. - and current_objects.input_deleted_sequencer not in ( + current_objects.input_deleted_sequencer not in ( select deleted_sequencer from current_objects where deleted_sequencer is not null ) + -- Only one event entry should be updated, and that entry must be the one with the + -- created sequencer that is maximum, i.e. closest to the deleted sequencer which + -- is going to be inserted. + order by current_objects.created_sequencer desc + limit 1 ), -- Finally, update the required objects. update as ( @@ -82,7 +87,7 @@ select last_modified_date, e_tag, storage_class as "storage_class?: StorageClass", - version_id, + version_id as "version_id!", deleted_sequencer as sequencer, number_reordered, number_duplicate_events, diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs index 431267ac6..06ccf71d4 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs @@ -127,7 +127,7 @@ impl Ingester { &object_created.last_modified_dates as &[Option>], &object_created.e_tags as &[Option], &object_created.storage_classes as &[Option], - &object_created.version_ids as &[Option], + &object_created.version_ids as &[String], &object_created.sequencers as &[Option] ) .fetch_all(&mut *tx) @@ -149,7 +149,7 @@ impl Ingester { &object_created.last_modified_dates as &[Option>], &object_created.e_tags as &[Option], &object_created.storage_classes as &[Option], - &object_created.version_ids as &[Option], + &object_created.version_ids, &object_created.sequencers as &[Option] ) .fetch_all(&mut *tx) @@ -179,7 +179,7 @@ impl Ingester { &object_deleted.buckets, &object_deleted.keys, &object_deleted.event_times as &[Option>], - &object_deleted.version_ids as &[Option], + &object_deleted.version_ids, &object_deleted.sequencers as &[Option], ) .fetch_all(&mut *tx) @@ -201,7 +201,7 @@ impl Ingester { &object_deleted.last_modified_dates as &[Option>], &object_deleted.e_tags as &[Option], &object_deleted.storage_classes as &[Option], - &object_deleted.version_ids as &[Option], + &object_deleted.version_ids, &object_deleted.sequencers as &[Option], // Fill this with 1 reorder, because if we get here then this must be a reordered event. &vec![1; object_deleted.s3_object_ids.len()] @@ -245,12 +245,11 @@ impl Ingest for Ingester { #[cfg(test)] pub(crate) mod tests { - use std::ops::Add; - use chrono::{DateTime, Utc}; use itertools::Itertools; use sqlx::postgres::PgRow; - use sqlx::{PgPool, Row}; + use sqlx::{Executor, PgPool, Row}; + use std::ops::Add; use tokio::time::Instant; use crate::database::aws::ingester::Ingester; @@ -259,7 +258,8 @@ pub(crate) mod tests { use crate::events::aws::message::EventType::{Created, Deleted}; use crate::events::aws::tests::{ expected_events_simple, expected_flat_events_simple, EXPECTED_E_TAG, - EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_VERSION_ID, + EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_SEQUENCER_CREATED_ZERO, + EXPECTED_SEQUENCER_DELETED_ONE, EXPECTED_SEQUENCER_DELETED_TWO, EXPECTED_VERSION_ID, }; use crate::events::aws::{Events, FlatS3EventMessage, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; @@ -290,11 +290,7 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -308,11 +304,7 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -335,11 +327,7 @@ pub(crate) mod tests { 2, s3_object_results[0].get::("number_duplicate_events") ); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -373,11 +361,7 @@ pub(crate) mod tests { 2, s3_object_results[0].get::("number_duplicate_events") ); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -403,11 +387,7 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); assert_eq!(2, s3_object_results[0].get::("number_reordered")); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -443,15 +423,23 @@ pub(crate) mod tests { s3_object_results[1].get::("number_duplicate_events") ); assert_eq!(1, s3_object_results[1].get::("number_reordered")); - assert_deleted_with( - &s3_object_results[1], + assert_with( + &s3_object_results[0], Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, ); - assert_created_with( - &s3_object_results[0], - Some(EXPECTED_VERSION_ID.to_string()), - EXPECTED_SEQUENCER_CREATED_ONE, + assert_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string().add("7")), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + Some(Default::default()), ); } @@ -479,7 +467,10 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); assert_eq!(0, s3_object_results[0].get::("number_reordered")); - assert_deleted_with(&s3_object_results[0], Some(0), None); + assert_ingest_events( + &s3_object_results[0], + &FlatS3EventMessage::default_version_id(), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -502,7 +493,10 @@ pub(crate) mod tests { 2, s3_object_results[0].get::("number_duplicate_events") ); - assert_deleted_with(&s3_object_results[0], Some(0), None); + assert_ingest_events( + &s3_object_results[0], + &FlatS3EventMessage::default_version_id(), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -536,7 +530,10 @@ pub(crate) mod tests { 2, s3_object_results[0].get::("number_duplicate_events") ); - assert_deleted_with(&s3_object_results[0], Some(0), None); + assert_ingest_events( + &s3_object_results[0], + &FlatS3EventMessage::default_version_id(), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -563,7 +560,15 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); assert_eq!(2, s3_object_results[0].get::("number_reordered")); - assert_deleted_with(&s3_object_results[0], Some(0), None); + assert_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + FlatS3EventMessage::default_version_id(), + Some(Default::default()), + Some(Default::default()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -576,7 +581,7 @@ pub(crate) mod tests { let event = expected_flat_events_simple().sort_and_dedup().into_inner(); let mut event = event[0].clone(); - event.version_id = Some("version_id".to_string()); + event.version_id = "version_id".to_string(); let mut events = vec![event]; events.extend(expected_flat_events_simple().sort_and_dedup().into_inner()); @@ -599,173 +604,839 @@ pub(crate) mod tests { s3_object_results[1].get::("number_duplicate_events") ); assert_eq!(0, s3_object_results[1].get::("number_reordered")); - assert_deleted_with( + assert_with( &s3_object_results[1], Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + Some(Default::default()), ); - assert_created_with( + assert_with( &s3_object_results[0], - Some("version_id".to_string()), - EXPECTED_SEQUENCER_CREATED_ONE, + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + None, + "version_id".to_string(), + Some(Default::default()), + None, ); } - #[ignore] #[sqlx::test(migrator = "MIGRATOR")] - async fn ingest_permutations(pool: PgPool) { - // This primarily tests out of order and duplicate event ingestion, however it could also function - // as a performance test. + async fn ingest_object_missing_deleted(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = test_events(); + // Missing deleted event. + events_one.object_deleted = Default::default(); + + // New created event with a higher sequencer. + let mut events_two = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_two.object_deleted = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_created(&s3_object_results[0]); + assert_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_deleted_reorder(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = test_events(); + // Missing deleted event. + events_one.object_deleted = Default::default(); + + // New created event with a higher sequencer. + let mut events_two = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_two.object_deleted = Default::default(); + + // Reorder + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_one).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_created(&s3_object_results[1]); + assert_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_created(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = test_events(); + // Missing created event. + events_one.object_created = Default::default(); + + // New deleted event with a higher sequencer. + let mut events_two = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + ); + events_two.object_created = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + assert_with( + &s3_object_results[1], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_created_reorder(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = test_events(); + // Missing created event. + events_one.object_created = Default::default(); + + // New deleted event with a higher sequencer. + let mut events_two = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + ); + events_two.object_created = Default::default(); + + // Reorder + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_one).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[1], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + assert_with( + &s3_object_results[0], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_deleted_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = remove_version_ids(test_events()); + // Missing deleted event. + events_one.object_deleted = Default::default(); + + // New created event with a higher sequencer. + let mut events_two = replace_sequencers( + remove_version_ids(test_events()), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_two.object_deleted = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + FlatS3EventMessage::default_version_id(), + Some(Default::default()), + None, + ); + assert_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + None, + FlatS3EventMessage::default_version_id(), + Some(Default::default()), + None, + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_deleted_reorder_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = remove_version_ids(test_events()); + // Missing deleted event. + events_one.object_deleted = Default::default(); + + // New created event with a higher sequencer. + let mut events_two = replace_sequencers( + remove_version_ids(test_events()), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_two.object_deleted = Default::default(); + + // Reorder + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_one).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + None, + FlatS3EventMessage::default_version_id(), + Some(Default::default()), + None, + ); + assert_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + FlatS3EventMessage::default_version_id(), + Some(Default::default()), + None, + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_created_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = remove_version_ids(test_events()); + // Missing created event. + events_one.object_created = Default::default(); + + // New deleted event with a higher sequencer. + let mut events_two = replace_sequencers( + remove_version_ids(test_events()), + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + ); + events_two.object_created = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + FlatS3EventMessage::default_version_id(), + None, + Some(Default::default()), + ); + assert_with( + &s3_object_results[1], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + FlatS3EventMessage::default_version_id(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_missing_created_reorder_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = remove_version_ids(test_events()); + // Missing created event. + events_one.object_created = Default::default(); + + // New deleted event with a higher sequencer. + let mut events_two = replace_sequencers( + remove_version_ids(test_events()), + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + ); + events_two.object_created = Default::default(); + + // Reorder + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_one).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[1], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + FlatS3EventMessage::default_version_id(), + None, + Some(Default::default()), + ); + assert_with( + &s3_object_results[0], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + FlatS3EventMessage::default_version_id(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_no_sequencer_created(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = replace_sequencers(test_events(), None); + events_one.object_deleted = Default::default(); + + let mut events_two = replace_sequencers(test_events(), None); + events_two.object_deleted = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + Some(0), + None, + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + assert_with( + &s3_object_results[1], + Some(0), + None, + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_no_sequencer_deleted(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = replace_sequencers(test_events(), None); + events_one.object_created = Default::default(); + + let mut events_two = replace_sequencers(test_events(), None); + events_two.object_created = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + None, + None, + None, + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + assert_with( + &s3_object_results[1], + None, + None, + None, + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_no_sequencer(pool: PgPool) { + let ingester = test_ingester(pool); + + let events = replace_sequencers(test_events(), None); + ingester.ingest_events(events).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + Some(0), + None, + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + assert_with( + &s3_object_results[1], + None, + None, + None, + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_multiple_matching_rows_created(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_CREATED_ZERO.to_string()), + ); + // Missing deleted event. + events_one.object_deleted = Default::default(); + + // New created event with a higher sequencer. + let mut events_two = test_events(); + events_two.object_deleted = Default::default(); + + // Next event matches both the above when checking sequencer condition. + let mut events_three = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_three.object_created = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_three).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ZERO.to_string()), + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, + ); + assert_ingest_events(&s3_object_results[1], EXPECTED_VERSION_ID); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_object_multiple_matching_rows_deleted(pool: PgPool) { + let ingester = test_ingester(pool); + + let mut events_one = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + ); + // Missing created event. + events_one.object_created = Default::default(); + + // New deleted event with a higher sequencer. + let mut events_two = replace_sequencers( + test_events(), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + ); + events_two.object_created = Default::default(); + + // Next event matches both the above when checking sequencer condition. + let mut events_three = test_events(); + events_three.object_deleted = Default::default(); + + ingester.ingest_events(events_one).await.unwrap(); + ingester.ingest_events(events_two).await.unwrap(); + ingester.ingest_events(events_three).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_with( + &s3_object_results[0], + None, + None, + Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()), + EXPECTED_VERSION_ID.to_string(), + None, + Some(Default::default()), + ); + assert_ingest_events(&s3_object_results[1], EXPECTED_VERSION_ID); + } + + pub(crate) fn assert_ingest_events(result: &PgRow, version_id: &str) { + assert_with( + result, + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + version_id.to_string(), + Some(Default::default()), + Some(Default::default()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_permutations_small_without_version_id(pool: PgPool) { let event_permutations = vec![ FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) + .with_default_version_id() .with_event_type(Created) .with_sequencer(Some("1".to_string())), FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) + .with_default_version_id() .with_event_type(Deleted) .with_sequencer(Some("2".to_string())), + // Missing created event. FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) - .with_event_type(Created) + .with_default_version_id() + .with_event_type(Deleted) .with_sequencer(Some("3".to_string())), - // Duplicate FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) + .with_default_version_id() .with_event_type(Created) - .with_sequencer(Some("3".to_string())), + .with_sequencer(Some("4".to_string())), + // Missing deleted event. FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) - .with_event_type(Deleted) - .with_sequencer(Some("4".to_string())), + .with_default_version_id() + .with_event_type(Created) + .with_sequencer(Some("5".to_string())), + // Different key + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key1".to_string()) + .with_default_version_id() + .with_event_type(Created) + .with_sequencer(Some("1".to_string())), + ]; + + // 720 permutations + run_permutation_test(&pool, event_permutations, 5, |s3_object_results| { + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + None, + Some("3"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("4"), + None, + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("5"), + None, + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key1", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("1"), + None, + ) + .unwrap(); + }) + .await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_permutations_small(pool: PgPool) { + let event_permutations = example_event_permutations(); + + // 720 permutations + run_permutation_test(&pool, event_permutations, 3, |s3_object_results| { + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("3"), + Some("4"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id1", + None, + Some("1"), + ) + .unwrap(); + }) + .await; + } + + #[ignore] + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_permutations(pool: PgPool) { + // This primarily tests out of order and duplicate event ingestion, however it could also function + // as a performance test. + let mut event_permutations = example_event_permutations(); + event_permutations.extend(vec![ FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) + .with_version_id("version_id".to_string()) .with_event_type(Created) .with_sequencer(Some("5".to_string())), FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id".to_string())) + .with_version_id("version_id".to_string()) .with_event_type(Deleted) .with_sequencer(Some("6".to_string())), + ]); + + // 40320 permutations + run_permutation_test(&pool, event_permutations, 4, |s3_object_results| { + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("3"), + Some("4"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("5"), + Some("6"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id1", + None, + Some("1"), + ) + .unwrap(); + }) + .await; + } + + fn example_event_permutations() -> Vec { + vec![ + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key".to_string()) + .with_version_id("version_id".to_string()) + .with_event_type(Created) + .with_sequencer(Some("1".to_string())), + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key".to_string()) + .with_version_id("version_id".to_string()) + .with_event_type(Deleted) + .with_sequencer(Some("2".to_string())), + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key".to_string()) + .with_version_id("version_id".to_string()) + .with_event_type(Created) + .with_sequencer(Some("3".to_string())), + // Duplicate + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key".to_string()) + .with_version_id("version_id".to_string()) + .with_event_type(Created) + .with_sequencer(Some("3".to_string())), + FlatS3EventMessage::new_with_generated_id() + .with_bucket("bucket".to_string()) + .with_key("key".to_string()) + .with_version_id("version_id".to_string()) + .with_event_type(Deleted) + .with_sequencer(Some("4".to_string())), // Different version id FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) - .with_version_id(Some("version_id1".to_string())) + .with_version_id("version_id1".to_string()) .with_event_type(Deleted) .with_sequencer(Some("1".to_string())), - ]; + ] + } + fn find_object_with<'a>( + results: &'a [PgRow], + key: &str, + bucket: &str, + version_id: &str, + created_sequencer: Option<&str>, + deleted_sequencer: Option<&str>, + ) -> Option<&'a PgRow> { + results.iter().find(|object| { + object.get::("key") == key + && object.get::("bucket") == bucket + && object.get::("version_id") == version_id + && object.get::, _>("created_sequencer") == created_sequencer + && object.get::, _>("deleted_sequencer") == deleted_sequencer + }) + } + + async fn run_permutation_test( + pool: &PgPool, + permutations: Vec, + expected_rows: usize, + row_asserts: F, + ) where + F: Fn(Vec), + { let now = Instant::now(); - let length = event_permutations.len(); - // 40320 permutations - for events in event_permutations.into_iter().permutations(length) { + let length = permutations.len(); + for events in permutations.into_iter().permutations(length) { let ingester = test_ingester(pool.clone()); - ingester - .ingest(EventSourceType::S3( - // Okay to dedup here as the Lambda function would be doing this anyway. - FlatS3EventMessages(events[0..2].to_vec()).dedup().into(), - )) - .await - .unwrap(); - ingester - .ingest(EventSourceType::S3( - FlatS3EventMessages(vec![events[2].clone()]).into(), - )) - .await - .unwrap(); - ingester - .ingest(EventSourceType::S3( - FlatS3EventMessages(vec![events[3].clone()]).into(), - )) - .await - .unwrap(); - ingester - .ingest(EventSourceType::S3( - FlatS3EventMessages(vec![events[4].clone()]).into(), - )) - .await - .unwrap(); - ingester - .ingest(EventSourceType::S3( - FlatS3EventMessages(events[5..].to_vec()).dedup().into(), - )) - .await - .unwrap(); + for chunk in events.chunks(1) { + ingester + .ingest(EventSourceType::S3( + // Okay to dedup here as the Lambda function would be doing this anyway. + FlatS3EventMessages(chunk.to_vec()).dedup().into(), + )) + .await + .unwrap(); + } let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_eq!(object_results.len(), 4); - assert_eq!(s3_object_results.len(), 4); + assert_eq!(object_results.len(), expected_rows); + assert_eq!(s3_object_results.len(), expected_rows); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::, _>("version_id") - == Some("version_id".to_string()) - && object.get::, _>("created_sequencer") - == Some("1".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("2".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::, _>("version_id") - == Some("version_id".to_string()) - && object.get::, _>("created_sequencer") - == Some("3".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("4".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::, _>("version_id") - == Some("version_id".to_string()) - && object.get::, _>("created_sequencer") - == Some("5".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("6".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::, _>("version_id") - == Some("version_id1".to_string()) - && object - .get::, _>("created_sequencer") - .is_none() - && object.get::, _>("deleted_sequencer") - == Some("1".to_string()) - }) - .unwrap(); + row_asserts(s3_object_results); + + // Clean up for next permutation. + pool.execute("truncate s3_object, object").await.unwrap(); } println!( @@ -779,12 +1450,27 @@ pub(crate) mod tests { .object_deleted .version_ids .iter_mut() - .for_each(|version_id| _ = version_id.take()); + .for_each(|version_id| *version_id = FlatS3EventMessage::default_version_id()); events .object_created .version_ids .iter_mut() - .for_each(|version_id| _ = version_id.take()); + .for_each(|version_id| *version_id = FlatS3EventMessage::default_version_id()); + + events + } + + fn replace_sequencers(mut events: Events, sequencer: Option) -> Events { + events + .object_deleted + .sequencers + .iter_mut() + .for_each(|replace_sequencer| *replace_sequencer = sequencer.clone()); + events + .object_created + .sequencers + .iter_mut() + .for_each(|replace_sequencer| *replace_sequencer = sequencer.clone()); events } @@ -802,65 +1488,51 @@ pub(crate) mod tests { ) } - pub(crate) fn assert_created_with( - s3_object_results: &PgRow, - expected_version_id: Option, - expected_sequencer: &str, - ) { - assert_eq!("bucket", s3_object_results.get::("bucket")); - assert_eq!("key", s3_object_results.get::("key")); - assert_eq!(0, s3_object_results.get::("size")); - assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); - assert_eq!( - expected_version_id, - s3_object_results.get::, _>("version_id") - ); - assert_eq!( - expected_sequencer, - s3_object_results.get::("created_sequencer") - ); - assert_eq!( - DateTime::::default(), - s3_object_results.get::, _>("created_date") - ); - assert_eq!( - DateTime::::default(), - s3_object_results.get::, _>("last_modified_date") - ); - } - pub(crate) fn assert_created(s3_object_results: &PgRow) { - assert_created_with( + assert_with( s3_object_results, - Some(EXPECTED_VERSION_ID.to_string()), - EXPECTED_SEQUENCER_CREATED_ONE, + Some(0), + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), + None, + EXPECTED_VERSION_ID.to_string(), + Some(Default::default()), + None, ) } - pub(crate) fn assert_deleted_with( + pub(crate) fn assert_with( s3_object_results: &PgRow, size: Option, - version_id: Option, + created_sequencer: Option, + deleted_sequencer: Option, + version_id: String, + created_date: Option>, + deleted_date: Option>, ) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); + assert_eq!(version_id, s3_object_results.get::("version_id")); assert_eq!( - version_id, - s3_object_results.get::, _>("version_id") + created_sequencer, + s3_object_results.get::, _>("created_sequencer") + ); + assert_eq!( + deleted_sequencer, + s3_object_results.get::, _>("deleted_sequencer") ); assert_eq!(size, s3_object_results.get::, _>("size")); assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( DateTime::::default(), - s3_object_results.get::, _>("created_date") + s3_object_results.get::, _>("last_modified_date") ); assert_eq!( - DateTime::::default(), - s3_object_results.get::, _>("last_modified_date") + created_date, + s3_object_results.get::>, _>("created_date") ); assert_eq!( - DateTime::::default(), - s3_object_results.get::, _>("deleted_date") + deleted_date, + s3_object_results.get::>, _>("deleted_date") ); } diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs index ef521e5b7..5ec177fe7 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs @@ -128,7 +128,7 @@ impl From for FlatS3EventMessages { size, e_tag: etag, sequencer, - version_id, + version_id: version_id.unwrap_or_else(FlatS3EventMessage::default_version_id), // Head fields are fetched later. storage_class: None, last_modified_date: None, @@ -179,7 +179,7 @@ mod tests { &Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); } @@ -195,7 +195,7 @@ mod tests { &Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); } } diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs index 67c531b75..8a38e7d22 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs @@ -65,7 +65,7 @@ pub struct TransposedS3EventMessages { pub event_times: Vec>>, pub buckets: Vec, pub keys: Vec, - pub version_ids: Vec>, + pub version_ids: Vec, pub sizes: Vec>, pub e_tags: Vec>, pub sequencers: Vec>, @@ -383,7 +383,7 @@ pub struct FlatS3EventMessage { pub sequencer: Option, pub bucket: String, pub key: String, - pub version_id: Option, + pub version_id: String, pub size: Option, pub e_tag: Option, pub storage_class: Option, @@ -453,11 +453,17 @@ impl FlatS3EventMessage { } /// Set the version id. - pub fn with_version_id(mut self, version_id: Option) -> Self { + pub fn with_version_id(mut self, version_id: String) -> Self { self.version_id = version_id; self } + /// Set the version id. + pub fn with_default_version_id(mut self) -> Self { + self.version_id = Self::default_version_id(); + self + } + /// Set the size. pub fn with_size(mut self, size: Option) -> Self { self.size = size; @@ -493,6 +499,10 @@ impl FlatS3EventMessage { self.event_type = event_type; self } + + pub fn default_version_id() -> String { + "null".to_string() + } } impl From> for FlatS3EventMessages { @@ -533,7 +543,7 @@ pub(crate) mod tests { &EventType::Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); let second = result.next().unwrap(); @@ -542,7 +552,7 @@ pub(crate) mod tests { &EventType::Created, Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); let third = result.next().unwrap(); @@ -551,7 +561,7 @@ pub(crate) mod tests { &EventType::Created, Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); } @@ -566,7 +576,7 @@ pub(crate) mod tests { &EventType::Created, Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); let second = result.next().unwrap(); @@ -575,7 +585,7 @@ pub(crate) mod tests { &EventType::Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); } @@ -590,7 +600,7 @@ pub(crate) mod tests { .first() .unwrap() .clone() - .with_version_id(Some("version_id".to_string())), + .with_version_id("version_id".to_string()), ); let result = FlatS3EventMessages(result).sort_and_dedup(); @@ -602,7 +612,7 @@ pub(crate) mod tests { &EventType::Created, Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); let second = result.next().unwrap(); @@ -611,7 +621,7 @@ pub(crate) mod tests { &EventType::Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some(EXPECTED_VERSION_ID.to_string()), + EXPECTED_VERSION_ID.to_string(), ); let third = result.next().unwrap(); @@ -620,7 +630,7 @@ pub(crate) mod tests { &EventType::Deleted, Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, - Some("version_id".to_string()), + "version_id".to_string(), ); } @@ -629,7 +639,7 @@ pub(crate) mod tests { event_type: &EventType, sequencer: Option, size: Option, - version_id: Option, + version_id: String, ) { assert_eq!(event.event_time, Some(DateTime::::default())); assert_eq!(&event.event_type, event_type); @@ -656,7 +666,7 @@ pub(crate) mod tests { assert_eq!(events.sizes[position], size); assert_eq!( events.version_ids[position], - Some(EXPECTED_VERSION_ID.to_string()) + EXPECTED_VERSION_ID.to_string() ); assert_eq!(events.e_tags[position], Some(EXPECTED_E_TAG.to_string())); assert_eq!(events.sequencers[position], Some(sequencer.to_string())); diff --git a/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs index 8e9d3e3a7..4e927f43d 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs @@ -92,7 +92,7 @@ mod tests { use aws_lambda_events::sqs::SqsMessage; use sqlx::PgPool; - use crate::database::aws::ingester::tests::{assert_deleted_with, fetch_results}; + use crate::database::aws::ingester::tests::{assert_ingest_events, fetch_results}; use crate::database::aws::migration::tests::MIGRATOR; use crate::events::aws::collecter::tests::{ expected_head_object, set_s3_client_expectations, set_sqs_client_expectations, @@ -118,11 +118,7 @@ mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } #[sqlx::test(migrator = "MIGRATOR")] @@ -146,10 +142,6 @@ mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with( - &s3_object_results[0], - Some(0), - Some(EXPECTED_VERSION_ID.to_string()), - ); + assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } }