diff --git a/lib/workload/stateful/filemanager/Cargo.lock b/lib/workload/stateful/filemanager/Cargo.lock index e7cf0b3e5..674ca63e6 100644 --- a/lib/workload/stateful/filemanager/Cargo.lock +++ b/lib/workload/stateful/filemanager/Cargo.lock @@ -1076,6 +1076,7 @@ dependencies = [ "filemanager", "futures", "hyper 1.0.1", + "itertools 0.12.0", "lambda_runtime", "lazy_static", "mockall", @@ -1638,6 +1639,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2123,7 +2133,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" dependencies = [ "anstyle", - "itertools", + "itertools 0.11.0", "predicates-core", ] @@ -2804,7 +2814,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" dependencies = [ - "itertools", + "itertools 0.11.0", "nom", "unicode_categories", ] diff --git a/lib/workload/stateful/filemanager/database/Dockerfile b/lib/workload/stateful/filemanager/database/Dockerfile index 66923807e..be54f22f6 100644 --- a/lib/workload/stateful/filemanager/database/Dockerfile +++ b/lib/workload/stateful/filemanager/database/Dockerfile @@ -1,3 +1,3 @@ -FROM postgres:16 +FROM postgres:15 COPY migrations/ /docker-entrypoint-initdb.d/ diff --git a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql index 3d9245e0e..170fe29ee 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql @@ -1,22 +1,9 @@ -- An general object table common across all storage types. create table object ( -- The unique id for this object. - object_id uuid not null default gen_random_uuid() primary key, - -- The bucket location. - bucket varchar(255) not null, - -- The name of the object. - key varchar(1024) not null, + object_id uuid not null primary key default gen_random_uuid(), -- The size of the object. - size int default null, + size integer default null, -- A unique identifier for the object, if it is present. - hash varchar(255) default null, - -- When this object was created. - created_date timestamptz not null default now(), - -- When this object was last modified. - last_modified_date timestamptz not null default now(), - -- When this object was deleted, a null value means that the object has not yet been deleted. - deleted_date timestamptz default null, - -- The date of the object and its id combined. - portal_run_id varchar(255) not null - -- provenance - history of all objects and how they move? + checksum text default null ); diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index ba2fc4cce..fe1ca6176 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -2,8 +2,41 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel -- An object contain in AWS S3, maps as a one-to-one relationship with the object table. create table s3_object( - -- The object id. - object_id uuid references object (object_id) primary key, + -- The s3 object id. + s3_object_id uuid not null primary key default gen_random_uuid(), + -- This is initially deferred because we want to create an s3_object before an object to check for duplicates/order. + object_id uuid references object (object_id) deferrable initially deferred, + + -- General fields + -- The bucket of the object. + bucket text not null, + -- The key of the object. + key text not null, + -- When this object was created. + created_date timestamptz not null default now(), + -- When this object was deleted, a null value means that the object has not yet been deleted. + deleted_date timestamptz default null, + -- provenance - history of all objects and how they move? + + -- AWS-specific fields + -- The AWS last modified value. + last_modified_date timestamptz default null, + -- An S3-specific e_tag, if it is present. + e_tag text default null, -- The S3 storage class of the object. - storage_class storage_class not null -); \ No newline at end of file + storage_class storage_class not null, + -- The version id of the object, if present. + version_id text default null, + -- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events. + created_sequencer text default null, + -- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events. + deleted_sequencer text default null, + -- Record whether the event that generated this object was ever out of order, useful for debugging. + event_out_of_order boolean not null default false, + -- Record the number of duplicate events received for this object, useful for debugging. + number_duplicate_events integer not null default 0, + + -- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event. + constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer), + constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer) +); diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql new file mode 100644 index 000000000..3e00bf96c --- /dev/null +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -0,0 +1,27 @@ +-- Bulk insert of s3 objects. +insert into s3_object ( + s3_object_id, + object_id, + bucket, + key, + created_date, + last_modified_date, + e_tag, + storage_class, + version_id, + created_sequencer +) +values ( + unnest($1::uuid[]), + unnest($2::uuid[]), + unnest($3::text[]), + unnest($4::text[]), + unnest($5::timestamptz[]), + unnest($6::timestamptz[]), + unnest($7::text[]), + unnest($8::storage_class[]), + unnest($9::text[]), + unnest($10::text[]) +) on conflict on constraint created_sequencer_unique do update + set number_duplicate_events = s3_object.number_duplicate_events + 1 + returning object_id, number_duplicate_events; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql new file mode 100644 index 000000000..f0bf29524 --- /dev/null +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -0,0 +1,31 @@ +-- Bulk insert of s3 objects. +insert into s3_object ( + s3_object_id, + object_id, + bucket, + key, + -- We default the created date to a value event if this is a deleted event, + -- as we are expecting this to get updated. + created_date, + deleted_date, + last_modified_date, + e_tag, + storage_class, + version_id, + deleted_sequencer +) +values ( + unnest($1::uuid[]), + unnest($2::uuid[]), + unnest($3::text[]), + unnest($4::text[]), + unnest($5::timestamptz[]), + unnest($6::timestamptz[]), + unnest($7::timestamptz[]), + unnest($8::text[]), + unnest($9::storage_class[]), + unnest($10::text[]), + unnest($11::text[]) +) on conflict on constraint deleted_sequencer_unique do update + set number_duplicate_events = s3_object.number_duplicate_events + 1 + returning object_id, number_duplicate_events; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql deleted file mode 100644 index f61e3cc75..000000000 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql +++ /dev/null @@ -1,6 +0,0 @@ --- Bulk insert of s3 objects. -insert into s3_object (object_id, storage_class) -values ( - unnest($1::uuid[]), - unnest($2::storage_class[]) -); \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql similarity index 58% rename from lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql rename to lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql index 30f7df943..b6adabf1b 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql @@ -1,9 +1,9 @@ --- Update the deleted time of s3 objects. -update object +-- Update the deleted time of objects. +update s3_object set deleted_date = data.deleted_time from (select unnest($1::varchar[]) as key, unnest($2::varchar[]) as bucket, unnest($3::timestamptz[]) as deleted_time ) as data -where object.key = data.key and object.bucket = data.bucket; \ No newline at end of file + where s3_object.key = data.key and s3_object.bucket = data.bucket; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql index 111140153..7f1bf0fc9 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql @@ -1,12 +1,7 @@ -- Bulk insert of objects -insert into object (object_id, bucket, key, size, hash, created_date, last_modified_date, portal_run_id) +insert into object (object_id, size, checksum) values ( unnest($1::uuid[]), - unnest($2::varchar[]), - unnest($3::varchar[]), - unnest($4::int[]), - unnest($5::varchar[]), - unnest($6::timestamptz[]), - unnest($7::timestamptz[]), - unnest($8::varchar[]) -); \ No newline at end of file + unnest($2::int[]), + unnest($3::text[]) +); diff --git a/lib/workload/stateful/filemanager/filemanager/Cargo.toml b/lib/workload/stateful/filemanager/filemanager/Cargo.toml index 5262ca838..639b5ed13 100644 --- a/lib/workload/stateful/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateful/filemanager/filemanager/Cargo.toml @@ -32,6 +32,7 @@ mockall = "0.12" mockall_double = "0.3" lambda_runtime = "0.8" aws_lambda_events = "0.12" +itertools = "0.12" # AWS aws-sdk-sqs = "1" diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index c9b9e727c..db8bfd63b 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -1,13 +1,14 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::query_file; -use tracing::trace; +use tracing::{debug, trace}; use crate::database::{Client, Ingest}; use crate::error::Result; use crate::events::aws::StorageClass; use crate::events::aws::{Events, TransposedS3EventMessages}; use crate::events::EventSourceType; +use uuid::Uuid; /// An ingester for S3 events. #[derive(Debug)] @@ -38,39 +39,74 @@ impl Ingester { trace!(object_created = ?object_created, "ingesting object created events"); let TransposedS3EventMessages { + sequencers, object_ids, event_times, buckets, keys, + version_ids, sizes, e_tags, - portal_run_ids, storage_classes, last_modified_dates, .. } = object_created; - query_file!( - "../database/queries/ingester/insert_objects.sql", + let mut tx = self.client().pool().begin().await?; + + let mut inserted = query_file!( + "../database/queries/ingester/aws/insert_s3_created_objects.sql", + &vec![Uuid::new_v4(); sizes.len()] as &[Uuid], &object_ids, &buckets, &keys, - &sizes as &[Option], - &e_tags as &[Option], &event_times, &last_modified_dates as &[Option>], - &portal_run_ids + &e_tags as &[Option], + &storage_classes as &[Option], + &version_ids as &[Option], + &sequencers as &[Option] ) - .execute(&self.client.pool) + .fetch_all(&mut *tx) .await?; - query_file!( - "../database/queries/ingester/aws/insert_s3_objects.sql", - &object_ids, - &storage_classes as &[Option] - ) - .execute(&self.client.pool) - .await?; + let (object_ids, sizes): (Vec<_>, Vec<_>) = object_ids + .into_iter() + .rev() + .zip(sizes.into_iter().rev()) + .flat_map(|(object_id, size)| { + // If we cannot find the object in our new ids, this object already exists. + let pos = inserted.iter().rposition(|record| { + // This will never be `None`, maybe this is an sqlx bug? + record.object_id == Some(object_id) + })?; + + // We can remove this to avoid searching over it again. + let record = inserted.remove(pos); + debug!( + object_id = ?record.object_id, + number_duplicate_events = record.number_duplicate_events, + "duplicate event found" + ); + + // This is a new event so the corresponding object should be inserted. + Some((object_id, size)) + }) + .unzip(); + + // Insert only the non duplicate events. + if !object_ids.is_empty() { + debug!(count = object_ids.len(), "inserting objects"); + + query_file!( + "../database/queries/ingester/insert_objects.sql", + &object_ids, + &sizes as &[Option], + &vec![None; sizes.len()] as &[Option], + ) + .execute(&mut *tx) + .await?; + } trace!(object_removed = ?object_removed, "ingesting object removed events"); let TransposedS3EventMessages { @@ -81,14 +117,16 @@ impl Ingester { } = object_removed; query_file!( - "../database/queries/ingester/update_deleted.sql", + "../database/queries/ingester/aws/update_deleted.sql", &keys, &buckets, &event_times ) - .execute(&self.client.pool) + .execute(&mut *tx) .await?; + tx.commit().await?; + Ok(()) } @@ -112,12 +150,16 @@ pub(crate) mod tests { use crate::database::aws::ingester::Ingester; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; - use crate::events::aws::tests::{expected_events, EXPECTED_E_TAG}; - use crate::events::aws::{Events, StorageClass}; + use crate::events::aws::tests::{ + expected_events, expected_flat_events, EXPECTED_E_TAG, EXPECTED_SEQUENCER_CREATED, + EXPECTED_VERSION_ID, + }; + use crate::events::aws::{Events, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; use chrono::{DateTime, Utc}; use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; + use std::ops::Add; #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_object_created(pool: PgPool) { @@ -127,12 +169,11 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let result = sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_created(result); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_created(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -142,12 +183,11 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let result = sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(result); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -157,65 +197,206 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest(EventSourceType::S3(events)).await.unwrap(); - let result = sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) .await .unwrap(); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(result); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); } - pub(crate) fn assert_created(result: PgRow) { - assert_eq!("bucket", result.get::("bucket")); - assert_eq!("key", result.get::("key")); - assert_eq!(0, result.get::("size")); - assert_eq!(EXPECTED_E_TAG, result.get::("hash")); + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates_complex(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + + let event = expected_flat_events().sort_and_dedup().into_inner(); + let mut event = event[0].clone(); + event.sequencer = Some(event.sequencer.unwrap().add("7")); + + let mut events = vec![event]; + events.extend(expected_flat_events().sort_and_dedup().into_inner()); + + let events = update_test_events(FlatS3EventMessages(events).into()); + + ingester.ingest(EventSourceType::S3(events)).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); + assert_created_with( + &object_results[1], + &s3_object_results[1], + EXPECTED_VERSION_ID, + &EXPECTED_SEQUENCER_CREATED.to_string().add("7"), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates_with_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + + let event = expected_flat_events().sort_and_dedup().into_inner(); + let mut event = event[0].clone(); + event.version_id = Some("version_id".to_string()); + + let mut events = vec![event]; + events.extend(expected_flat_events().sort_and_dedup().into_inner()); + + let events = update_test_events(FlatS3EventMessages(events).into()); + + ingester.ingest(EventSourceType::S3(events)).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); + assert_created_with( + &object_results[1], + &s3_object_results[1], + "version_id", + EXPECTED_SEQUENCER_CREATED, + ); + } + + pub(crate) async fn fetch_results(ingester: &Ingester) -> (Vec, Vec) { + ( + sqlx::query("select * from object") + .fetch_all(ingester.client.pool()) + .await + .unwrap(), + sqlx::query("select * from s3_object") + .fetch_all(ingester.client.pool()) + .await + .unwrap(), + ) + } + + pub(crate) fn assert_created_with( + object_results: &PgRow, + s3_object_results: &PgRow, + expected_version_id: &str, + expected_sequencer: &str, + ) { + assert_eq!("bucket", s3_object_results.get::("bucket")); + assert_eq!("key", s3_object_results.get::("key")); + assert_eq!(0, object_results.get::("size")); + assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); + assert_eq!( + expected_version_id, + s3_object_results.get::("version_id") + ); + assert_eq!( + expected_sequencer, + s3_object_results.get::("created_sequencer") + ); assert_eq!( DateTime::::default(), - result.get::, _>("created_date") + s3_object_results.get::, _>("created_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("last_modified_date") + s3_object_results.get::, _>("last_modified_date") ); - assert!(result - .get::("portal_run_id") - .starts_with("19700101")); } - pub(crate) fn assert_deleted(result: PgRow) { - assert_eq!("bucket", result.get::("bucket")); - assert_eq!("key", result.get::("key")); - assert_eq!(0, result.get::("size")); - assert_eq!(EXPECTED_E_TAG, result.get::("hash")); + pub(crate) fn assert_created(object_results: &PgRow, s3_object_results: &PgRow) { + assert_created_with( + object_results, + s3_object_results, + EXPECTED_VERSION_ID, + EXPECTED_SEQUENCER_CREATED, + ) + } + + pub(crate) fn assert_deleted(object_results: &PgRow, s3_object_results: &PgRow) { + assert_eq!("bucket", s3_object_results.get::("bucket")); + assert_eq!("key", s3_object_results.get::("key")); + assert_eq!( + EXPECTED_VERSION_ID, + s3_object_results.get::("version_id") + ); + assert_eq!(0, object_results.get::("size")); + assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( DateTime::::default(), - result.get::, _>("created_date") + s3_object_results.get::, _>("created_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("last_modified_date") + s3_object_results.get::, _>("last_modified_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("deleted_date") + s3_object_results.get::, _>("deleted_date") ); - assert!(result - .get::("portal_run_id") - .starts_with("19700101")); } - fn test_events() -> Events { - let mut events = expected_events(); + fn update_test_events(mut events: Events) -> Events { + let update_last_modified = |dates: &mut Vec>>| { + dates.iter_mut().for_each(|last_modified| { + *last_modified = Some(DateTime::default()); + }); + }; + let update_storage_class = |classes: &mut Vec>| { + classes.iter_mut().for_each(|storage_class| { + *storage_class = Some(StorageClass::Standard); + }); + }; - events.object_created.last_modified_dates[0] = Some(DateTime::default()); - events.object_created.storage_classes[0] = Some(StorageClass::Standard); + update_last_modified(&mut events.object_created.last_modified_dates); + update_storage_class(&mut events.object_created.storage_classes); - events.object_removed.last_modified_dates[0] = Some(DateTime::default()); - events.object_removed.storage_classes[0] = None; + update_last_modified(&mut events.object_removed.last_modified_dates); + update_storage_class(&mut events.object_removed.storage_classes); events } + + fn test_events() -> Events { + update_test_events(expected_events()) + } + fn test_ingester(pool: PgPool) -> Ingester { Ingester::new(Client::new(pool)) } diff --git a/lib/workload/stateful/filemanager/filemanager/src/error.rs b/lib/workload/stateful/filemanager/filemanager/src/error.rs index 2ad1b2e13..427f0d88a 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/error.rs @@ -39,6 +39,8 @@ pub enum Error { S3Error(String), #[error("Config error: `{0}`")] ConfigError(String), + #[error("Ingester error: `{0}`")] + IngesterError(String), } impl From for Error { diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs index 02c3d09ff..9d2caa2b6 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs @@ -121,7 +121,18 @@ pub(crate) mod tests { let events = CollecterBuilder::receive(&sqs_client, "url").await.unwrap(); - assert_eq!(events, expected_flat_events()); + let mut expected = expected_flat_events(); + expected + .0 + .iter_mut() + .zip(&events.0) + .for_each(|(expected_event, event)| { + // The object id will be different for each event. + expected_event.object_id = event.object_id; + expected_event.portal_run_id = event.portal_run_id.to_string(); + }); + + assert_eq!(events, expected); } #[tokio::test] diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 854aa6bed..0ae4870b4 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -1,10 +1,9 @@ //! Convert S3 events for the database. //! -use std::cmp::Ordering; - use aws_sdk_s3::types::StorageClass as AwsStorageClass; use chrono::{DateTime, ParseError, Utc}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use uuid::Uuid; @@ -68,6 +67,7 @@ pub struct TransposedS3EventMessages { pub event_names: Vec, pub buckets: Vec, pub keys: Vec, + pub version_ids: Vec>, pub sizes: Vec>, pub e_tags: Vec>, pub sequencers: Vec>, @@ -86,6 +86,7 @@ impl TransposedS3EventMessages { event_names: Vec::with_capacity(capacity), buckets: Vec::with_capacity(capacity), keys: Vec::with_capacity(capacity), + version_ids: Vec::with_capacity(capacity), sizes: Vec::with_capacity(capacity), e_tags: Vec::with_capacity(capacity), sequencers: Vec::with_capacity(capacity), @@ -104,6 +105,7 @@ impl TransposedS3EventMessages { bucket, key, size, + version_id, e_tag, sequencer, portal_run_id, @@ -117,6 +119,7 @@ impl TransposedS3EventMessages { self.event_names.push(event_name); self.buckets.push(bucket); self.keys.push(key); + self.version_ids.push(version_id); self.sizes.push(size); self.e_tags.push(e_tag); self.sequencers.push(sequencer); @@ -142,7 +145,7 @@ impl From for TransposedS3EventMessages { } /// Group by event types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Events { pub object_created: TransposedS3EventMessages, pub object_removed: TransposedS3EventMessages, @@ -194,101 +197,117 @@ impl FlatS3EventMessages { self.0 } - /// Rearrange these messages so that duplicates are removed events are in the correct - /// order. + /// Rearrange messages so that duplicates are removed events are in the correct + /// order. Note that the standard `PartialEq`, `Eq`, `PartialOrd` and `Ord` are not + /// directly used because the `PartialOrd` is not consistent with `PartialEq`. Namely, + /// when ordering events, the event time is taken into account, however it is not taken + /// into account for event equality. pub fn sort_and_dedup(self) -> Self { - let mut messages = self.into_inner(); - - messages.sort(); - messages.dedup(); - - Self(messages) + self.dedup().sort() } -} -impl Ord for FlatS3EventMessage { - /// Ordering is implemented so that the sequencer values are considered when the bucket and the - /// key are the same. - fn cmp(&self, other: &Self) -> Ordering { - // If the sequencer values are present and the bucket and key are the same. - if let (Some(self_sequencer), Some(other_sequencer)) = - (self.sequencer.as_ref(), other.sequencer.as_ref()) - { - if self.bucket == other.bucket && self.key == other.key { - return ( - self_sequencer, - &self.event_time, - &self.event_name, - &self.bucket, - &self.key, - &self.size, - &self.e_tag, - &self.storage_class, - &self.last_modified_date, - ) - .cmp(&( - other_sequencer, - &other.event_time, - &other.event_name, - &other.bucket, - &other.key, - &other.size, - &other.e_tag, - &other.storage_class, - &other.last_modified_date, - )); - } - } + /// Equality is implemented so that for the same bucket and key, the event is considered the same if the + /// sequencer, event name, and version matches. Crucially, this means that events with different event times + /// may be considered the same. Events may arrive at different times, but represent the same event. This matches + /// the logic in this example: + /// https://github.com/aws-samples/amazon-s3-endedupe/blob/bd906412c2b4ca26eee6312e3ac99120790b9de9/endedupe/app.py#L79-L83 + pub fn dedup(self) -> Self { + let messages = self.into_inner(); - ( - &self.event_time, - &self.event_name, - &self.bucket, - &self.key, - &self.size, - &self.e_tag, - &self.sequencer, - &self.storage_class, - &self.last_modified_date, + Self( + messages + .into_iter() + .unique_by(|value| { + ( + value.sequencer.clone(), + value.event_name.clone(), + value.bucket.clone(), + value.key.clone(), + value.version_id.clone(), + // Note, `last_modified` and `storage_class` are always `None` at this point anyway so don't need + // to be considered. `size` and `e_tag` should be the same but are unimportant in deduplication. + ) + }) + .collect(), ) - .cmp(&( - &other.event_time, - &other.event_name, - &other.bucket, - &other.key, - &other.size, - &other.e_tag, - &other.sequencer, - &other.storage_class, - &other.last_modified_date, - )) } -} -impl PartialOrd for FlatS3EventMessage { - fn partial_cmp(&self, other: &Self) -> Option { - // Total ordering. - Some(self.cmp(other)) - } -} + /// Ordering is implemented so that the sequencer values are considered when the bucket, the + /// key and the version id are the same. + /// + /// Unlike the `dedup` function, this implementation does consider the event time. This means that events + /// will be ingested in event time order if the sequencer condition is not met. + pub fn sort(self) -> Self { + let mut messages = self.into_inner(); + + messages.sort(); + messages.sort_by(|a, b| { + if let (Some(a_sequencer), Some(b_sequencer)) = + (a.sequencer.as_ref(), b.sequencer.as_ref()) + { + if a.bucket == b.bucket + && a.key == b.key + && a.version_id == b.version_id + && a.event_name == b.event_name + { + return ( + a_sequencer, + &a.event_time, + &a.event_name, + &a.bucket, + &a.key, + &a.version_id, + &a.size, + &a.e_tag, + &a.storage_class, + &a.last_modified_date, + ) + .cmp(&( + b_sequencer, + &b.event_time, + &b.event_name, + &b.bucket, + &b.key, + &b.version_id, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )); + } + } + + ( + &a.event_time, + &a.sequencer, + &a.event_name, + &a.bucket, + &a.key, + &a.version_id, + &a.size, + &a.e_tag, + &a.storage_class, + &a.last_modified_date, + ) + .cmp(&( + &b.event_time, + &b.sequencer, + &b.event_name, + &b.bucket, + &b.key, + &b.version_id, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )) + }); -impl PartialEq for FlatS3EventMessage { - /// Equality is implemented normally except the object_id and portal_run_id are ignored, - /// as these are newly derived for each event. - fn eq(&self, other: &Self) -> bool { - // Must be consistent with PartialOrd - self.event_time == other.event_time - && self.event_name == other.event_name - && self.bucket == other.bucket - && self.key == other.key - && self.size == other.size - && self.e_tag == other.e_tag - && self.storage_class == other.storage_class - && self.last_modified_date == other.last_modified_date + Self(messages) } } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone)] pub enum EventType { Created, Removed, @@ -296,19 +315,20 @@ pub enum EventType { } /// A flattened AWS S3 record -#[derive(Debug, Eq)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone)] pub struct FlatS3EventMessage { - pub object_id: Uuid, - pub event_time: DateTime, + pub sequencer: Option, pub event_name: String, pub bucket: String, pub key: String, + pub version_id: Option, pub size: Option, pub e_tag: Option, - pub sequencer: Option, - pub portal_run_id: String, pub storage_class: Option, pub last_modified_date: Option>, + pub object_id: Uuid, + pub event_time: DateTime, + pub portal_run_id: String, pub event_type: EventType, } @@ -340,6 +360,84 @@ impl FlatS3EventMessage { e_tag.into_iter().for_each(|e_tag| self.e_tag = Some(e_tag)); self } + + /// Set the sequencer value. + pub fn with_sequencer(mut self, sequencer: Option) -> Self { + self.sequencer = sequencer; + self + } + + /// Set the event name. + pub fn with_event_name(mut self, event_name: String) -> Self { + self.event_name = event_name; + self + } + + /// Set the bucket. + pub fn with_bucket(mut self, bucket: String) -> Self { + self.bucket = bucket; + self + } + + /// Set the key. + pub fn with_key(mut self, key: String) -> Self { + self.key = key; + self + } + + /// Set the version id. + pub fn with_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } + + /// Set the size. + pub fn with_size(mut self, size: Option) -> Self { + self.size = size; + self + } + + /// Set the e_tag. + pub fn with_e_tag(mut self, e_tag: Option) -> Self { + self.e_tag = e_tag; + self + } + + /// Set the storage class. + pub fn with_storage_class(mut self, storage_class: Option) -> Self { + self.storage_class = storage_class; + self + } + + /// Set the last modified date. + pub fn with_last_modified_date(mut self, last_modified_date: Option>) -> Self { + self.last_modified_date = last_modified_date; + self + } + + /// Set the object id. + pub fn with_object_id(mut self, object_id: Uuid) -> Self { + self.object_id = object_id; + self + } + + /// Set the event time. + pub fn with_event_time(mut self, event_time: DateTime) -> Self { + self.event_time = event_time; + self + } + + /// Set the portal run id. + pub fn with_portal_run_id(mut self, portal_run_id: String) -> Self { + self.portal_run_id = portal_run_id; + self + } + + /// Set the event type. + pub fn with_event_type(mut self, event_type: EventType) -> Self { + self.event_type = event_type; + self + } } /// The basic AWS S3 Event. @@ -377,6 +475,7 @@ pub struct ObjectRecord { pub key: String, pub size: Option, pub e_tag: Option, + pub version_id: Option, pub sequencer: Option, } @@ -403,6 +502,7 @@ impl TryFrom for FlatS3EventMessages { key, size, e_tag, + version_id, sequencer, } = object; @@ -431,6 +531,7 @@ impl TryFrom for FlatS3EventMessages { size, e_tag, sequencer, + version_id, portal_run_id, // Head field are fetched later. storage_class: None, @@ -459,6 +560,8 @@ pub(crate) mod tests { pub(crate) const EXPECTED_SEQUENCER_DELETED: &str = "0055AED6DCD90281E6"; // pragma: allowlist secret pub(crate) const EXPECTED_E_TAG: &str = "d41d8cd98f00b204e9800998ecf8427e"; // pragma: allowlist secret + pub(crate) const EXPECTED_VERSION_ID: &str = "096fKKXTRTtl3on89fVO.nfljtsv6qko"; + #[test] fn test_flat_events() { let result = expected_flat_events(); @@ -470,6 +573,7 @@ pub(crate) mod tests { "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED, None, + Some(EXPECTED_VERSION_ID.to_string()), ); let second = result.next().unwrap(); @@ -478,6 +582,7 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); let third = result.next().unwrap(); @@ -486,6 +591,7 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); } @@ -500,6 +606,7 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); let second = result.next().unwrap(); @@ -508,6 +615,52 @@ pub(crate) mod tests { "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED, None, + Some(EXPECTED_VERSION_ID.to_string()), + ); + } + + #[test] + fn test_sort_and_dedup_with_version_id() { + let result = expected_flat_events(); + + let mut result = result.into_inner(); + result.push( + expected_flat_events() + .into_inner() + .first() + .unwrap() + .clone() + .with_version_id(Some("version_id".to_string())), + ); + + let result = FlatS3EventMessages(result).sort_and_dedup(); + let mut result = result.into_inner().into_iter(); + + let first = result.next().unwrap(); + assert_flat_s3_event( + first, + "ObjectCreated:Put", + EXPECTED_SEQUENCER_CREATED, + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); + + let second = result.next().unwrap(); + assert_flat_s3_event( + second, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + Some(EXPECTED_VERSION_ID.to_string()), + ); + + let third = result.next().unwrap(); + assert_flat_s3_event( + third, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + Some("version_id".to_string()), ); } @@ -516,11 +669,13 @@ pub(crate) mod tests { event_name: &str, sequencer: &str, size: Option, + version_id: Option, ) { assert_eq!(event.event_time, DateTime::::default()); assert_eq!(event.event_name, event_name); assert_eq!(event.bucket, "bucket"); assert_eq!(event.key, "key"); + assert_eq!(event.version_id, version_id); assert_eq!(event.size, size); assert_eq!(event.e_tag, Some(EXPECTED_E_TAG.to_string())); // pragma: allowlist secret assert_eq!(event.sequencer, Some(sequencer.to_string())); @@ -541,6 +696,10 @@ pub(crate) mod tests { assert_eq!(result.object_created.buckets[0], "bucket"); assert_eq!(result.object_created.keys[0], "key"); assert_eq!(result.object_created.sizes[0], Some(0)); + assert_eq!( + result.object_created.version_ids[0], + Some(EXPECTED_VERSION_ID.to_string()) + ); assert_eq!( result.object_created.e_tags[0], Some(EXPECTED_E_TAG.to_string()) @@ -560,6 +719,10 @@ pub(crate) mod tests { assert_eq!(result.object_removed.event_names[0], "ObjectRemoved:Delete"); assert_eq!(result.object_removed.buckets[0], "bucket"); assert_eq!(result.object_removed.keys[0], "key"); + assert_eq!( + result.object_removed.version_ids[0], + Some(EXPECTED_VERSION_ID.to_string()) + ); assert_eq!(result.object_removed.sizes[0], None); assert_eq!( result.object_removed.e_tags[0], @@ -581,7 +744,7 @@ pub(crate) mod tests { pub(crate) fn expected_events() -> Events { let events = expected_flat_events().sort_and_dedup(); - events.try_into().unwrap() + events.into() } pub(crate) fn expected_event_record() -> String { @@ -613,7 +776,7 @@ pub(crate) mod tests { "key": "key", "size": 0, "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_CREATED } }, @@ -642,7 +805,7 @@ pub(crate) mod tests { "key": "key", "size": 0, "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_CREATED } }); @@ -667,7 +830,7 @@ pub(crate) mod tests { // ObjectRemoved::Delete does not have a size, even though this isn't documented // anywhere. "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_DELETED } }); diff --git a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs index d75e2e2b3..e94edb8f8 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs @@ -89,7 +89,7 @@ pub async fn ingest_event( #[cfg(test)] mod tests { use super::*; - use crate::database::aws::ingester::tests::assert_deleted; + use crate::database::aws::ingester::tests::{assert_deleted, fetch_results}; use crate::database::aws::migration::tests::MIGRATOR; use crate::events::aws::collecter::tests::{expected_head_object, set_s3_client_expectations}; use crate::events::aws::collector_builder::tests::set_sqs_client_expectations; @@ -110,12 +110,11 @@ mod tests { .await .unwrap(); - let result = sqlx::query("select * from object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(result); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -135,11 +134,10 @@ mod tests { .await .unwrap(); - let result = sqlx::query("select * from object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(result); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } }