From 8443bfadee1b3ca3e44edc56b6ead626f7086c1f Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 16 Jan 2024 19:37:58 +1100 Subject: [PATCH 01/17] feat(filemanager): add basic proc macro outline --- lib/workload/stateful/filemanager/Cargo.lock | 11 +++++ lib/workload/stateful/filemanager/Cargo.toml | 1 + .../filemanager/filemanager-macros/Cargo.toml | 15 +++++++ .../filemanager/filemanager-macros/README.md | 3 ++ .../filemanager/filemanager-macros/src/lib.rs | 40 +++++++++++++++++++ .../filemanager/filemanager/Cargo.toml | 1 + .../filemanager/filemanager/src/lib.rs | 4 ++ 7 files changed, 75 insertions(+) create mode 100644 lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml create mode 100644 lib/workload/stateful/filemanager/filemanager-macros/README.md create mode 100644 lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs diff --git a/lib/workload/stateful/filemanager/Cargo.lock b/lib/workload/stateful/filemanager/Cargo.lock index 69f596575..7bd065d2d 100644 --- a/lib/workload/stateful/filemanager/Cargo.lock +++ b/lib/workload/stateful/filemanager/Cargo.lock @@ -1073,6 +1073,7 @@ dependencies = [ "axum", "chrono", "dotenvy", + "filemanager-macros", "futures", "hyper 1.0.1", "lambda_runtime", @@ -1122,6 +1123,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "filemanager-macros" +version = "0.1.0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "finl_unicode" version = "1.2.0" diff --git a/lib/workload/stateful/filemanager/Cargo.toml b/lib/workload/stateful/filemanager/Cargo.toml index e498db2bf..26a774c2a 100644 --- a/lib/workload/stateful/filemanager/Cargo.toml +++ b/lib/workload/stateful/filemanager/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "filemanager", + "filemanager-macros", "filemanager-http-lambda", "filemanager-ingest-lambda", ] diff --git a/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml b/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml new file mode 100644 index 000000000..25d5c6b00 --- /dev/null +++ b/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "filemanager-macros" +version = "0.1.0" +license.workspace = true +edition.workspace = true +authors.workspace = true + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "2.0", features = ["full"] } +quote = "1.0" +proc-macro2 = "1.0" +darling = "0.20" \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager-macros/README.md b/lib/workload/stateful/filemanager/filemanager-macros/README.md new file mode 100644 index 000000000..2cdf84aae --- /dev/null +++ b/lib/workload/stateful/filemanager/filemanager-macros/README.md @@ -0,0 +1,3 @@ +# filemanager-macros + +This crate contains macro implementations for filemanager. \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs new file mode 100644 index 000000000..a7d41ea9a --- /dev/null +++ b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs @@ -0,0 +1,40 @@ +use darling::{Error, FromMeta}; +use darling::ast::NestedMeta; +use syn::ItemFn; +use proc_macro::TokenStream; +use quote::{format_ident, quote}; + +#[derive(Debug, FromMeta)] +struct MacroArgs { +} + +#[proc_macro_attribute] +pub fn import_plrust_function(args: TokenStream, input: TokenStream) -> TokenStream { + let attr_args = match NestedMeta::parse_meta_list(args.into()) { + Ok(v) => v, + Err(e) => { return TokenStream::from(Error::from(e).write_errors()); } + }; + let args = match MacroArgs::from_list(&attr_args) { + Ok(v) => v, + Err(e) => { return TokenStream::from(e.write_errors()); } + }; + + let input = syn::parse_macro_input!(input as ItemFn); + + let input_clone = input.clone(); + let vis = input_clone.vis; + let name = format_ident!("{}_migrate", input_clone.sig.ident); + + let method = quote! { + #vis fn #name() -> sqlx::migrate::Migrator { + todo!(); + } + }; + + let tokens = quote! { + #input + #method + }; + + tokens.into() +} \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager/Cargo.toml b/lib/workload/stateful/filemanager/filemanager/Cargo.toml index 161f5c492..9eb6b68b5 100644 --- a/lib/workload/stateful/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateful/filemanager/filemanager/Cargo.toml @@ -28,6 +28,7 @@ mockall = "0.12" mockall_double = "0.3" lambda_runtime = "0.8" aws_lambda_events = "0.12" +filemanager-macros = { path = "../filemanager-macros" } # AWS aws-sdk-sqs = "1" diff --git a/lib/workload/stateful/filemanager/filemanager/src/lib.rs b/lib/workload/stateful/filemanager/filemanager/src/lib.rs index ef8b452c3..07049b024 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/lib.rs @@ -8,3 +8,7 @@ pub mod env; pub mod error; pub mod events; pub mod handlers; + +// Re-export all macros. +pub use filemanager_macros::*; + From e172dd7e92ed177b4f406b2452c6cd77a73b2c3c Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Wed, 17 Jan 2024 09:39:58 +1100 Subject: [PATCH 02/17] refactor(filemanager): improve equality checking --- lib/workload/stateful/filemanager/Cargo.lock | 17 +- .../filemanager/filemanager/Cargo.toml | 10 +- .../filemanager/src/events/aws/mod.rs | 319 +++++++++++------- 3 files changed, 222 insertions(+), 124 deletions(-) diff --git a/lib/workload/stateful/filemanager/Cargo.lock b/lib/workload/stateful/filemanager/Cargo.lock index 7bd065d2d..35e6bb464 100644 --- a/lib/workload/stateful/filemanager/Cargo.lock +++ b/lib/workload/stateful/filemanager/Cargo.lock @@ -1073,10 +1073,12 @@ dependencies = [ "axum", "chrono", "dotenvy", - "filemanager-macros", + "filemanager", "futures", "hyper 1.0.1", + "itertools 0.12.0", "lambda_runtime", + "lazy_static", "mockall", "mockall_double", "serde", @@ -1631,6 +1633,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2116,7 +2127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" dependencies = [ "anstyle", - "itertools", + "itertools 0.11.0", "predicates-core", ] @@ -2797,7 +2808,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" dependencies = [ - "itertools", + "itertools 0.11.0", "nom", "unicode_categories", ] diff --git a/lib/workload/stateful/filemanager/filemanager/Cargo.toml b/lib/workload/stateful/filemanager/filemanager/Cargo.toml index 9eb6b68b5..639b5ed13 100644 --- a/lib/workload/stateful/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateful/filemanager/filemanager/Cargo.toml @@ -6,6 +6,10 @@ authors.workspace = true license.workspace = true edition.workspace = true +[features] + +migrate = ["sqlx/migrate"] + [dependencies] axum = "0.6" hyper = { version = "1", features = ["full"] } @@ -28,7 +32,7 @@ mockall = "0.12" mockall_double = "0.3" lambda_runtime = "0.8" aws_lambda_events = "0.12" -filemanager-macros = { path = "../filemanager-macros" } +itertools = "0.12" # AWS aws-sdk-sqs = "1" @@ -40,3 +44,7 @@ futures = "0.3" [dev-dependencies] aws-smithy-runtime-api = "1" + +# The migrate feature is required to run sqlx tests +filemanager = { path = ".", features = ["migrate"] } +lazy_static = "1.4" diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 2e85e0be1..58da582fb 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -5,6 +5,7 @@ use std::cmp::Ordering; use aws_sdk_s3::types::StorageClass as AwsStorageClass; use chrono::{DateTime, ParseError, Utc}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use uuid::Uuid; @@ -67,8 +68,8 @@ pub struct TransposedS3EventMessages { pub event_names: Vec, pub buckets: Vec, pub keys: Vec, - pub sizes: Vec, - pub e_tags: Vec, + pub sizes: Vec>, + pub e_tags: Vec>, pub sequencers: Vec>, pub portal_run_ids: Vec, pub storage_classes: Vec>, @@ -108,6 +109,7 @@ impl TransposedS3EventMessages { portal_run_id, storage_class, last_modified_date, + .. } = message; self.object_ids.push(object_id); @@ -153,15 +155,20 @@ impl From for Events { let mut object_removed = FlatS3EventMessages::default(); let mut other = FlatS3EventMessages::default(); - messages.into_inner().into_iter().for_each(|message| { - if message.event_name.contains("ObjectCreated") { - object_created.0.push(message); - } else if message.event_name.contains("ObjectRemoved") { - object_removed.0.push(message); - } else { - other.0.push(message); - } - }); + messages + .into_inner() + .into_iter() + .for_each(|message| match message.event_type { + Created => { + object_created.0.push(message); + } + Removed => { + object_removed.0.push(message); + } + Other => { + other.0.push(message); + } + }); Self { object_created: TransposedS3EventMessages::from(object_created), @@ -187,126 +194,152 @@ impl FlatS3EventMessages { self.0 } - /// Rearrange these messages so that duplicates are removed events are in the correct - /// order. + /// Rearrange messages so that duplicates are removed events are in the correct + /// order. Note that the standard `PartialEq`, `Eq`, `PartialOrd` and `Ord` are not + /// directly used because the `PartialOrd` is not consistent with `PartialEq`. Namely, + /// when ordering events, the event time is taken into account, however it is not taken + /// into account for event equality. pub fn sort_and_dedup(self) -> Self { - let mut messages = self.into_inner(); + self.dedup().sort() + } - messages.sort(); - messages.dedup(); + /// Equality is implemented so that for the same bucket and key, the event is considered the same if the + /// sequencer, event name, and version matches. Crucially, this means that events with different event times + /// may be considered the same. Events may arrive at different times, but represent the same event. This matches + /// the logic in this example: + /// https://github.com/aws-samples/amazon-s3-endedupe/blob/bd906412c2b4ca26eee6312e3ac99120790b9de9/endedupe/app.py#L79-L83 + pub fn dedup(self) -> Self { + let mut messages = self.into_inner(); - Self(messages) + Self(messages.into_iter().unique_by(|value| ( + &value.sequencer, + &value.event_name, + &value.bucket, + &value.key, + &value.size, + &value.e_tag, + // Note, `last_modified` and `storage_class` are always `None` at this point anyway so don't need + // to be considered. + )).collect()) } -} -impl Ord for FlatS3EventMessage { /// Ordering is implemented so that the sequencer values are considered when the bucket and the /// key are the same. - fn cmp(&self, other: &Self) -> Ordering { - // If the sequencer values are present and the bucket and key are the same. - if let (Some(self_sequencer), Some(other_sequencer)) = - (self.sequencer.as_ref(), other.sequencer.as_ref()) - { - if self.bucket == other.bucket && self.key == other.key { - return ( - self_sequencer, - &self.event_time, - &self.event_name, - &self.bucket, - &self.key, - &self.size, - &self.e_tag, - &self.storage_class, - &self.last_modified_date, - ) - .cmp(&( - other_sequencer, - &other.event_time, - &other.event_name, - &other.bucket, - &other.key, - &other.size, - &other.e_tag, - &other.storage_class, - &other.last_modified_date, - )); + /// + /// Unlike the `dedup` function, this implementation does consider the event time. This means that events + /// will be ingested in event time order if the sequencer condition is not met. + pub fn sort(self) -> Self { + let mut messages = self.into_inner(); + + messages.sort(); + messages.sort_by(|a, b| { + if let (Some(a_sequencer), Some(b_sequencer)) = + (a.sequencer.as_ref(), b.sequencer.as_ref()) + { + if a.bucket == b.bucket && a.key == b.key { + return ( + a_sequencer, + &a.event_time, + &a.event_name, + &a.bucket, + &a.key, + &a.size, + &a.e_tag, + &a.storage_class, + &a.last_modified_date, + ) + .cmp(&( + b_sequencer, + &b.event_time, + &b.event_name, + &b.bucket, + &b.key, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )); + } } - } - ( - &self.event_time, - &self.event_name, - &self.bucket, - &self.key, - &self.size, - &self.e_tag, - &self.sequencer, - &self.storage_class, - &self.last_modified_date, - ) - .cmp(&( - &other.event_time, - &other.event_name, - &other.bucket, - &other.key, - &other.size, - &other.e_tag, - &other.sequencer, - &other.storage_class, - &other.last_modified_date, - )) - } -} + ( + &a.event_time, + &a.sequencer, + &a.event_name, + &a.bucket, + &a.key, + &a.size, + &a.e_tag, + &a.storage_class, + &a.last_modified_date, + ) + .cmp(&( + &b.event_time, + &b.sequencer, + &b.event_name, + &b.bucket, + &b.key, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )) + }); -impl PartialOrd for FlatS3EventMessage { - fn partial_cmp(&self, other: &Self) -> Option { - // Total ordering. - Some(self.cmp(other)) + Self(messages) } } -impl PartialEq for FlatS3EventMessage { - /// Equality is implemented normally except the object_id and portal_run_id are ignored, - /// as these are newly derived for each event. - fn eq(&self, other: &Self) -> bool { - // Must be consistent with PartialOrd - self.event_time == other.event_time - && self.event_name == other.event_name - && self.bucket == other.bucket - && self.key == other.key - && self.size == other.size - && self.e_tag == other.e_tag - && self.storage_class == other.storage_class - && self.last_modified_date == other.last_modified_date - } +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +pub enum EventType { + Created, + Removed, + Other, } /// A flattened AWS S3 record -#[derive(Debug, Eq)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct FlatS3EventMessage { - pub object_id: Uuid, - pub event_time: DateTime, + pub sequencer: Option, pub event_name: String, pub bucket: String, pub key: String, - pub size: i32, - pub e_tag: String, - pub sequencer: Option, - pub portal_run_id: String, + pub size: Option, + pub e_tag: Option, pub storage_class: Option, pub last_modified_date: Option>, + pub object_id: Uuid, + pub event_time: DateTime, + pub portal_run_id: String, + pub event_type: EventType, } impl FlatS3EventMessage { - /// Update the storage class. - pub fn with_storage_class(mut self, storage_class: Option) -> Self { - self.storage_class = storage_class; + /// Update the storage class if not None.` + pub fn update_storage_class(mut self, storage_class: Option) -> Self { + storage_class + .into_iter() + .for_each(|storage_class| self.storage_class = Some(storage_class)); + self + } + + /// Update the last modified date if not None. + pub fn update_last_modified_date(mut self, last_modified_date: Option>) -> Self { + last_modified_date + .into_iter() + .for_each(|last_modified_date| self.last_modified_date = Some(last_modified_date)); + self + } + + /// Update the size if not None. + pub fn update_size(mut self, size: Option) -> Self { + size.into_iter().for_each(|size| self.size = Some(size)); self } - /// Update the last modified date. - pub fn with_last_modified_date(mut self, last_modified_date: Option>) -> Self { - self.last_modified_date = last_modified_date; + /// Update the e_tag if not None. + pub fn update_e_tag(mut self, e_tag: Option) -> Self { + e_tag.into_iter().for_each(|e_tag| self.e_tag = Some(e_tag)); self } } @@ -315,7 +348,7 @@ impl FlatS3EventMessage { #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(rename_all = "camelCase")] pub struct S3EventMessage { - #[serde(alias = "Records")] + #[serde(rename = "Records")] pub records: Vec, } @@ -344,8 +377,8 @@ pub struct BucketRecord { #[serde(rename_all = "camelCase")] pub struct ObjectRecord { pub key: String, - pub size: i32, - pub e_tag: String, + pub size: Option, + pub e_tag: Option, pub sequencer: Option, } @@ -383,6 +416,14 @@ impl TryFrom for FlatS3EventMessages { let portal_run_id = event_time.format("%Y%m%d").to_string() + &object_id.to_string()[..8]; + let event_type = if event_name.contains("ObjectCreated") { + Created + } else if event_name.contains("ObjectRemoved") { + Removed + } else { + Other + }; + Ok(FlatS3EventMessage { object_id, event_time, @@ -393,9 +434,10 @@ impl TryFrom for FlatS3EventMessages { e_tag, sequencer, portal_run_id, - // Head field are optionally fetched later. + // Head field are fetched later. storage_class: None, last_modified_date: None, + event_type, }) }) .collect::>>()?, @@ -425,13 +467,28 @@ pub(crate) mod tests { let mut result = result.into_inner().into_iter(); let first = result.next().unwrap(); - assert_flat_s3_event(first, "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED); + assert_flat_s3_event( + first, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + ); let second = result.next().unwrap(); - assert_flat_s3_event(second, "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED); + assert_flat_s3_event( + second, + "ObjectCreated:Put", + EXPECTED_SEQUENCER_CREATED, + Some(0), + ); let third = result.next().unwrap(); - assert_flat_s3_event(third, "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED); + assert_flat_s3_event( + third, + "ObjectCreated:Put", + EXPECTED_SEQUENCER_CREATED, + Some(0), + ); } #[test] @@ -440,19 +497,34 @@ pub(crate) mod tests { let mut result = result.into_inner().into_iter(); let first = result.next().unwrap(); - assert_flat_s3_event(first, "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED); + assert_flat_s3_event( + first, + "ObjectCreated:Put", + EXPECTED_SEQUENCER_CREATED, + Some(0), + ); let second = result.next().unwrap(); - assert_flat_s3_event(second, "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED); + assert_flat_s3_event( + second, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + ); } - fn assert_flat_s3_event(event: FlatS3EventMessage, event_name: &str, sequencer: &str) { + fn assert_flat_s3_event( + event: FlatS3EventMessage, + event_name: &str, + sequencer: &str, + size: Option, + ) { assert_eq!(event.event_time, DateTime::::default()); assert_eq!(event.event_name, event_name); assert_eq!(event.bucket, "bucket"); assert_eq!(event.key, "key"); - assert_eq!(event.size, 0); - assert_eq!(event.e_tag, EXPECTED_E_TAG); // pragma: allowlist secret + assert_eq!(event.size, size); + assert_eq!(event.e_tag, Some(EXPECTED_E_TAG.to_string())); // pragma: allowlist secret assert_eq!(event.sequencer, Some(sequencer.to_string())); assert!(event.portal_run_id.starts_with("19700101")); assert_eq!(event.storage_class, None); @@ -470,8 +542,11 @@ pub(crate) mod tests { assert_eq!(result.object_created.event_names[0], "ObjectCreated:Put"); assert_eq!(result.object_created.buckets[0], "bucket"); assert_eq!(result.object_created.keys[0], "key"); - assert_eq!(result.object_created.sizes[0], 0); - assert_eq!(result.object_created.e_tags[0], EXPECTED_E_TAG); + assert_eq!(result.object_created.sizes[0], Some(0)); + assert_eq!( + result.object_created.e_tags[0], + Some(EXPECTED_E_TAG.to_string()) + ); assert_eq!( result.object_created.sequencers[0], Some(EXPECTED_SEQUENCER_CREATED.to_string()) @@ -487,8 +562,11 @@ pub(crate) mod tests { assert_eq!(result.object_removed.event_names[0], "ObjectRemoved:Delete"); assert_eq!(result.object_removed.buckets[0], "bucket"); assert_eq!(result.object_removed.keys[0], "key"); - assert_eq!(result.object_removed.sizes[0], 0); - assert_eq!(result.object_removed.e_tags[0], EXPECTED_E_TAG); + assert_eq!(result.object_removed.sizes[0], None); + assert_eq!( + result.object_removed.e_tags[0], + Some(EXPECTED_E_TAG.to_string()) + ); assert_eq!( result.object_removed.sequencers[0], Some(EXPECTED_SEQUENCER_DELETED.to_string()) @@ -588,7 +666,8 @@ pub(crate) mod tests { }, "object": { "key": "key", - "size": 0, + // ObjectRemoved::Delete does not have a size, even though this isn't documented + // anywhere. "eTag": EXPECTED_E_TAG, "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "sequencer": EXPECTED_SEQUENCER_DELETED From 4f8e9d1da5704cc69b918b552ca54754130967da Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Wed, 17 Jan 2024 10:20:05 +1100 Subject: [PATCH 03/17] refactor(filemanager): fix test, merge with main --- lib/workload/stateful/filemanager/Cargo.lock | 24 +++++- .../filemanager/filemanager-macros/src/lib.rs | 17 ++-- .../src/events/aws/collector_builder.rs | 14 +++- .../filemanager/src/events/aws/mod.rs | 80 +++++++++---------- .../filemanager/filemanager/src/lib.rs | 4 - 5 files changed, 85 insertions(+), 54 deletions(-) diff --git a/lib/workload/stateful/filemanager/Cargo.lock b/lib/workload/stateful/filemanager/Cargo.lock index e7cf0b3e5..99a34cd56 100644 --- a/lib/workload/stateful/filemanager/Cargo.lock +++ b/lib/workload/stateful/filemanager/Cargo.lock @@ -1076,6 +1076,7 @@ dependencies = [ "filemanager", "futures", "hyper 1.0.1", + "itertools 0.12.0", "lambda_runtime", "lazy_static", "mockall", @@ -1124,6 +1125,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "filemanager-macros" +version = "0.1.0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "filemanager-migrate-lambda" version = "0.1.0" @@ -1638,6 +1649,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2123,7 +2143,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" dependencies = [ "anstyle", - "itertools", + "itertools 0.11.0", "predicates-core", ] @@ -2804,7 +2824,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" dependencies = [ - "itertools", + "itertools 0.11.0", "nom", "unicode_categories", ] diff --git a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs index a7d41ea9a..506f43c96 100644 --- a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs +++ b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs @@ -1,22 +1,25 @@ -use darling::{Error, FromMeta}; use darling::ast::NestedMeta; -use syn::ItemFn; +use darling::{Error, FromMeta}; use proc_macro::TokenStream; use quote::{format_ident, quote}; +use syn::ItemFn; #[derive(Debug, FromMeta)] -struct MacroArgs { -} +struct MacroArgs {} #[proc_macro_attribute] pub fn import_plrust_function(args: TokenStream, input: TokenStream) -> TokenStream { let attr_args = match NestedMeta::parse_meta_list(args.into()) { Ok(v) => v, - Err(e) => { return TokenStream::from(Error::from(e).write_errors()); } + Err(e) => { + return TokenStream::from(Error::from(e).write_errors()); + } }; let args = match MacroArgs::from_list(&attr_args) { Ok(v) => v, - Err(e) => { return TokenStream::from(e.write_errors()); } + Err(e) => { + return TokenStream::from(e.write_errors()); + } }; let input = syn::parse_macro_input!(input as ItemFn); @@ -37,4 +40,4 @@ pub fn import_plrust_function(args: TokenStream, input: TokenStream) -> TokenStr }; tokens.into() -} \ No newline at end of file +} diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs index 02c3d09ff..1d60d3754 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs @@ -110,6 +110,7 @@ pub(crate) mod tests { use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::types::builders::MessageBuilder; use mockall::predicate::eq; + use uuid::Uuid; use super::*; @@ -121,7 +122,18 @@ pub(crate) mod tests { let events = CollecterBuilder::receive(&sqs_client, "url").await.unwrap(); - assert_eq!(events, expected_flat_events()); + let mut expected = expected_flat_events(); + expected + .0 + .iter_mut() + .zip(&events.0) + .for_each(|(expected_event, event)| { + // The object id will be different for each event. + expected_event.object_id = event.object_id; + expected_event.portal_run_id = event.portal_run_id.to_string(); + }); + + assert_eq!(events, expected); } #[tokio::test] diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 286018d9c..c0e1c861c 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -212,16 +212,23 @@ impl FlatS3EventMessages { pub fn dedup(self) -> Self { let mut messages = self.into_inner(); - Self(messages.into_iter().unique_by(|value| ( - &value.sequencer, - &value.event_name, - &value.bucket, - &value.key, - &value.size, - &value.e_tag, - // Note, `last_modified` and `storage_class` are always `None` at this point anyway so don't need - // to be considered. - )).collect()) + Self( + messages + .into_iter() + .unique_by(|value| { + ( + value.sequencer.clone(), + value.event_name.clone(), + value.bucket.clone(), + value.key.clone(), + value.size, + value.e_tag.clone(), + // Note, `last_modified` and `storage_class` are always `None` at this point anyway so don't need + // to be considered. + ) + }) + .collect(), + ) } /// Ordering is implemented so that the sequencer values are considered when the bucket and the @@ -235,7 +242,7 @@ impl FlatS3EventMessages { messages.sort(); messages.sort_by(|a, b| { if let (Some(a_sequencer), Some(b_sequencer)) = - (a.sequencer.as_ref(), b.sequencer.as_ref()) + (a.sequencer.as_ref(), b.sequencer.as_ref()) { if a.bucket == b.bucket && a.key == b.key { return ( @@ -249,17 +256,17 @@ impl FlatS3EventMessages { &a.storage_class, &a.last_modified_date, ) - .cmp(&( - b_sequencer, - &b.event_time, - &b.event_name, - &b.bucket, - &b.key, - &b.size, - &b.e_tag, - &b.storage_class, - &b.last_modified_date, - )); + .cmp(&( + b_sequencer, + &b.event_time, + &b.event_name, + &b.bucket, + &b.key, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )); } } @@ -274,17 +281,17 @@ impl FlatS3EventMessages { &a.storage_class, &a.last_modified_date, ) - .cmp(&( - &b.event_time, - &b.sequencer, - &b.event_name, - &b.bucket, - &b.key, - &b.size, - &b.e_tag, - &b.storage_class, - &b.last_modified_date, - )) + .cmp(&( + &b.event_time, + &b.sequencer, + &b.event_name, + &b.bucket, + &b.key, + &b.size, + &b.e_tag, + &b.storage_class, + &b.last_modified_date, + )) }); Self(messages) @@ -298,13 +305,6 @@ pub enum EventType { Other, } -#[derive(Debug, Eq, PartialEq)] -pub enum EventType { - Created, - Removed, - Other, -} - /// A flattened AWS S3 record #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct FlatS3EventMessage { diff --git a/lib/workload/stateful/filemanager/filemanager/src/lib.rs b/lib/workload/stateful/filemanager/filemanager/src/lib.rs index 07049b024..ef8b452c3 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/lib.rs @@ -8,7 +8,3 @@ pub mod env; pub mod error; pub mod events; pub mod handlers; - -// Re-export all macros. -pub use filemanager_macros::*; - From 1eed111254a5dfb5d6d120708fd3cb1166fb3ee7 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 18 Jan 2024 16:27:31 +1100 Subject: [PATCH 04/17] feat(filemanager): add additional sequencer for s3_object --- .../database/migrations/0001_add_object_table.sql | 10 +++++----- .../database/migrations/0002_add_s3_object_table.sql | 10 +++++++++- .../stateful/filemanager/filemanager-macros/src/lib.rs | 2 +- .../filemanager/src/events/aws/collector_builder.rs | 1 - .../filemanager/filemanager/src/events/aws/mod.rs | 6 ++---- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql index 3d9245e0e..c61a8f116 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql @@ -3,13 +3,13 @@ create table object ( -- The unique id for this object. object_id uuid not null default gen_random_uuid() primary key, -- The bucket location. - bucket varchar(255) not null, + bucket text not null, -- The name of the object. - key varchar(1024) not null, + key text not null, -- The size of the object. - size int default null, + size integer default null, -- A unique identifier for the object, if it is present. - hash varchar(255) default null, + checksum text default null, -- When this object was created. created_date timestamptz not null default now(), -- When this object was last modified. @@ -17,6 +17,6 @@ create table object ( -- When this object was deleted, a null value means that the object has not yet been deleted. deleted_date timestamptz default null, -- The date of the object and its id combined. - portal_run_id varchar(255) not null + portal_run_id text not null -- provenance - history of all objects and how they move? ); diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index ba2fc4cce..811f16a0e 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -5,5 +5,13 @@ create table s3_object( -- The object id. object_id uuid references object (object_id) primary key, -- The S3 storage class of the object. - storage_class storage_class not null + storage_class storage_class not null, + -- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events. + created_sequencer text default null, + -- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events. + deleted_sequencer text default null, + -- Record whether the event that generated this object was ever out of order, useful for debugging. + event_out_of_order boolean not null default false, + -- Record the number of duplicate events received for this object, useful for debugging. + number_duplicate_events integer not null default 0 ); \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs index 506f43c96..d03ff9cd6 100644 --- a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs +++ b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs @@ -15,7 +15,7 @@ pub fn import_plrust_function(args: TokenStream, input: TokenStream) -> TokenStr return TokenStream::from(Error::from(e).write_errors()); } }; - let args = match MacroArgs::from_list(&attr_args) { + let _args = match MacroArgs::from_list(&attr_args) { Ok(v) => v, Err(e) => { return TokenStream::from(e.write_errors()); diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs index 1d60d3754..9d2caa2b6 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/collector_builder.rs @@ -110,7 +110,6 @@ pub(crate) mod tests { use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::types::builders::MessageBuilder; use mockall::predicate::eq; - use uuid::Uuid; use super::*; diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index c0e1c861c..2728363b1 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -1,8 +1,6 @@ //! Convert S3 events for the database. //! -use std::cmp::Ordering; - use aws_sdk_s3::types::StorageClass as AwsStorageClass; use chrono::{DateTime, ParseError, Utc}; use itertools::Itertools; @@ -210,7 +208,7 @@ impl FlatS3EventMessages { /// the logic in this example: /// https://github.com/aws-samples/amazon-s3-endedupe/blob/bd906412c2b4ca26eee6312e3ac99120790b9de9/endedupe/app.py#L79-L83 pub fn dedup(self) -> Self { - let mut messages = self.into_inner(); + let messages = self.into_inner(); Self( messages @@ -591,7 +589,7 @@ pub(crate) mod tests { pub(crate) fn expected_events() -> Events { let events = expected_flat_events().sort_and_dedup(); - events.try_into().unwrap() + events.into() } pub(crate) fn expected_event_record() -> String { From 05882af96aa56340905a9079c6b14578cf273caa Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 18 Jan 2024 16:29:38 +1100 Subject: [PATCH 05/17] refactor(filemanager): move e_tag to s3_object, add bucket and key to s3_object --- .../database/migrations/0002_add_s3_object_table.sql | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index 811f16a0e..b2ff28d9f 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -4,6 +4,12 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel create table s3_object( -- The object id. object_id uuid references object (object_id) primary key, + -- Duplicate the bucket here because it is useful for conflicts. This must match the bucket in object. + bucket text references object(bucket) not null, + -- Duplicate the key here because it is useful for conflicts. This must match the key in object. + key text references object(bucket) not null, + -- An S3-specific e_tag, if it is present. + e_tag text default null, -- The S3 storage class of the object. storage_class storage_class not null, -- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events. From ff8a2c3ae3201792f1729ba4618947a5dc74dfff Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 18 Jan 2024 16:36:06 +1100 Subject: [PATCH 06/17] feat(filemanager): add sequencer check constraint to s3_object --- .../database/migrations/0002_add_s3_object_table.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index b2ff28d9f..10d5edc5f 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -19,5 +19,8 @@ create table s3_object( -- Record whether the event that generated this object was ever out of order, useful for debugging. event_out_of_order boolean not null default false, -- Record the number of duplicate events received for this object, useful for debugging. - number_duplicate_events integer not null default 0 + number_duplicate_events integer not null default 0, + + -- The deleted sequencer must be greater than the created sequencer. + check (deleted_sequencer is null or created_sequencer is null or deleted_sequencer > created_sequencer) ); \ No newline at end of file From 94b15d1db5463dd8d99afe98b0327afc67875cc6 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 18 Jan 2024 16:50:59 +1100 Subject: [PATCH 07/17] feat(filemanager): add unique constraints --- .../database/migrations/0002_add_s3_object_table.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index 10d5edc5f..88d206805 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -21,6 +21,9 @@ create table s3_object( -- Record the number of duplicate events received for this object, useful for debugging. number_duplicate_events integer not null default 0, + -- The sequencers should be unique with the bucket and key, otherwise this is a duplicate event. + constraint created_sequencer_unique unique (bucket, key, created_sequencer), + constraint deleted_sequencer_unique unique (bucket, key, deleted_sequencer), -- The deleted sequencer must be greater than the created sequencer. - check (deleted_sequencer is null or created_sequencer is null or deleted_sequencer > created_sequencer) + constraint deleted_greater_than_created check (deleted_sequencer is null or created_sequencer is null or deleted_sequencer > created_sequencer) ); \ No newline at end of file From d865b2472b61ff8517f5056e06b439c3a8633985 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Fri, 19 Jan 2024 09:18:23 +1100 Subject: [PATCH 08/17] feat(filemanager): remove key and bucket reference from s3_object --- lib/workload/stateful/filemanager/database/Dockerfile | 2 +- .../database/migrations/0002_add_s3_object_table.sql | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/workload/stateful/filemanager/database/Dockerfile b/lib/workload/stateful/filemanager/database/Dockerfile index 66923807e..be54f22f6 100644 --- a/lib/workload/stateful/filemanager/database/Dockerfile +++ b/lib/workload/stateful/filemanager/database/Dockerfile @@ -1,3 +1,3 @@ -FROM postgres:16 +FROM postgres:15 COPY migrations/ /docker-entrypoint-initdb.d/ diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index 88d206805..3a865d987 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -5,9 +5,9 @@ create table s3_object( -- The object id. object_id uuid references object (object_id) primary key, -- Duplicate the bucket here because it is useful for conflicts. This must match the bucket in object. - bucket text references object(bucket) not null, + bucket text not null, -- Duplicate the key here because it is useful for conflicts. This must match the key in object. - key text references object(bucket) not null, + key text not null, -- An S3-specific e_tag, if it is present. e_tag text default null, -- The S3 storage class of the object. @@ -23,7 +23,5 @@ create table s3_object( -- The sequencers should be unique with the bucket and key, otherwise this is a duplicate event. constraint created_sequencer_unique unique (bucket, key, created_sequencer), - constraint deleted_sequencer_unique unique (bucket, key, deleted_sequencer), - -- The deleted sequencer must be greater than the created sequencer. - constraint deleted_greater_than_created check (deleted_sequencer is null or created_sequencer is null or deleted_sequencer > created_sequencer) + constraint deleted_sequencer_unique unique (bucket, key, deleted_sequencer) ); \ No newline at end of file From ee325fe8ee24e9d80110c4bb3ca10c6967791ec5 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Fri, 19 Jan 2024 18:17:47 +1100 Subject: [PATCH 09/17] refactor(filemanager): update inserts with sequencer values --- .../ingester/aws/insert_s3_created_objects.sql | 11 +++++++++++ .../ingester/aws/insert_s3_deleted_objects.sql | 11 +++++++++++ .../queries/ingester/aws/insert_s3_objects.sql | 6 ------ .../database/queries/ingester/insert_objects.sql | 10 +++++----- 4 files changed, 27 insertions(+), 11 deletions(-) create mode 100644 lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql create mode 100644 lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql delete mode 100644 lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql new file mode 100644 index 000000000..a4b5e1395 --- /dev/null +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -0,0 +1,11 @@ +-- Bulk insert of s3 objects. +insert into s3_object (object_id, bucket, key, e_tag, storage_class, created_sequencer) +values ( + unnest($1::uuid[]), + unnest($2::text[]), + unnest($3::text[]), + unnest($4::text[]), + unnest($5::storage_class[]), + unnest($6::text[]) +) on conflict on constraint created_sequencer_unique do update + set number_duplicate_events = s3_object.number_duplicate_events + 1; \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql new file mode 100644 index 000000000..6f5e85b1d --- /dev/null +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -0,0 +1,11 @@ +-- Bulk insert of s3 objects. +insert into s3_object (object_id, bucket, key, e_tag, storage_class, deleted_sequencer) +values ( + unnest($1::uuid[]), + unnest($2::text[]), + unnest($3::text[]), + unnest($4::text[]), + unnest($5::storage_class[]), + unnest($6::text[]) +) on conflict on constraint deleted_sequencer_unique do update + set number_duplicate_events = s3_object.number_duplicate_events + 1; \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql deleted file mode 100644 index f61e3cc75..000000000 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_objects.sql +++ /dev/null @@ -1,6 +0,0 @@ --- Bulk insert of s3 objects. -insert into s3_object (object_id, storage_class) -values ( - unnest($1::uuid[]), - unnest($2::storage_class[]) -); \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql index 111140153..6d558e33f 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql @@ -1,12 +1,12 @@ -- Bulk insert of objects -insert into object (object_id, bucket, key, size, hash, created_date, last_modified_date, portal_run_id) +insert into object (object_id, bucket, key, size, checksum, created_date, last_modified_date, portal_run_id) values ( unnest($1::uuid[]), - unnest($2::varchar[]), - unnest($3::varchar[]), + unnest($2::text[]), + unnest($3::text[]), unnest($4::int[]), - unnest($5::varchar[]), + unnest($5::text[]), unnest($6::timestamptz[]), unnest($7::timestamptz[]), - unnest($8::varchar[]) + unnest($8::text[]) ); \ No newline at end of file From 3ed4bad44bc2b060b9344f09321763663cd11914 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 22 Jan 2024 09:21:12 +1100 Subject: [PATCH 10/17] refactor(filemanager): move more fields to s3_object, update queries --- .../migrations/0001_add_object_table.sql | 15 +-------- .../migrations/0002_add_s3_object_table.sql | 15 +++++++-- .../aws/insert_s3_created_objects.sql | 19 +++++++++--- .../aws/insert_s3_deleted_objects.sql | 31 ++++++++++++++----- .../ingester/{ => aws}/update_deleted.sql | 6 ++-- .../queries/ingester/insert_objects.sql | 13 +++----- .../filemanager/src/database/aws/ingester.rs | 21 +++++++------ 7 files changed, 70 insertions(+), 50 deletions(-) rename lib/workload/stateful/filemanager/database/queries/ingester/{ => aws}/update_deleted.sql (59%) diff --git a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql index c61a8f116..05ecf9248 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql @@ -2,21 +2,8 @@ create table object ( -- The unique id for this object. object_id uuid not null default gen_random_uuid() primary key, - -- The bucket location. - bucket text not null, - -- The name of the object. - key text not null, -- The size of the object. size integer default null, -- A unique identifier for the object, if it is present. - checksum text default null, - -- When this object was created. - created_date timestamptz not null default now(), - -- When this object was last modified. - last_modified_date timestamptz not null default now(), - -- When this object was deleted, a null value means that the object has not yet been deleted. - deleted_date timestamptz default null, - -- The date of the object and its id combined. - portal_run_id text not null - -- provenance - history of all objects and how they move? + checksum text default null ); diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index 3a865d987..5f2bd79ac 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -4,10 +4,21 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel create table s3_object( -- The object id. object_id uuid references object (object_id) primary key, - -- Duplicate the bucket here because it is useful for conflicts. This must match the bucket in object. + + -- General fields + -- The bucket of the object. bucket text not null, - -- Duplicate the key here because it is useful for conflicts. This must match the key in object. + -- The key of the object. key text not null, + -- When this object was created. + created_date timestamptz not null default now(), + -- When this object was deleted, a null value means that the object has not yet been deleted. + deleted_date timestamptz default null, + -- provenance - history of all objects and how they move? + + -- AWS-specific fields + -- The AWS last modified value. + last_modified_date timestamptz default null, -- An S3-specific e_tag, if it is present. e_tag text default null, -- The S3 storage class of the object. diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index a4b5e1395..ec6d7e8cd 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -1,11 +1,22 @@ -- Bulk insert of s3 objects. -insert into s3_object (object_id, bucket, key, e_tag, storage_class, created_sequencer) +insert into s3_object ( + object_id, + bucket, + key, + created_date, + last_modified_date, + e_tag, + storage_class, + created_sequencer +) values ( unnest($1::uuid[]), unnest($2::text[]), unnest($3::text[]), - unnest($4::text[]), - unnest($5::storage_class[]), - unnest($6::text[]) + unnest($4::timestamptz[]), + unnest($5::timestamptz[]), + unnest($6::text[]), + unnest($7::storage_class[]), + unnest($8::text[]) ) on conflict on constraint created_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1; \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql index 6f5e85b1d..068375b7a 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -1,11 +1,26 @@ -- Bulk insert of s3 objects. -insert into s3_object (object_id, bucket, key, e_tag, storage_class, deleted_sequencer) +insert into s3_object ( + object_id, + bucket, + key, + -- We default the created date to a value event if this is a deleted event, + -- as we are expecting this to get updated. + created_date, + deleted_date, + last_modified_date, + e_tag, + storage_class, + deleted_sequencer +) values ( - unnest($1::uuid[]), - unnest($2::text[]), - unnest($3::text[]), - unnest($4::text[]), - unnest($5::storage_class[]), - unnest($6::text[]) + unnest($1::uuid[]), + unnest($2::text[]), + unnest($3::text[]), + unnest($4::timestamptz[]), + unnest($5::timestamptz[]), + unnest($6::timestamptz[]), + unnest($7::text[]), + unnest($8::storage_class[]), + unnest($9::text[]) ) on conflict on constraint deleted_sequencer_unique do update - set number_duplicate_events = s3_object.number_duplicate_events + 1; \ No newline at end of file + set number_duplicate_events = s3_object.number_duplicate_events + 1; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql similarity index 59% rename from lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql rename to lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql index 30f7df943..5480f76cf 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/update_deleted.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql @@ -1,9 +1,9 @@ --- Update the deleted time of s3 objects. -update object +-- Update the deleted time of objects. +update s3_object set deleted_date = data.deleted_time from (select unnest($1::varchar[]) as key, unnest($2::varchar[]) as bucket, unnest($3::timestamptz[]) as deleted_time ) as data -where object.key = data.key and object.bucket = data.bucket; \ No newline at end of file +where s3_object.key = data.key and s3_object.bucket = data.bucket; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql index 6d558e33f..7f1bf0fc9 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/insert_objects.sql @@ -1,12 +1,7 @@ -- Bulk insert of objects -insert into object (object_id, bucket, key, size, checksum, created_date, last_modified_date, portal_run_id) +insert into object (object_id, size, checksum) values ( unnest($1::uuid[]), - unnest($2::text[]), - unnest($3::text[]), - unnest($4::int[]), - unnest($5::text[]), - unnest($6::timestamptz[]), - unnest($7::timestamptz[]), - unnest($8::text[]) -); \ No newline at end of file + unnest($2::int[]), + unnest($3::text[]) +); diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index c9b9e727c..99502d155 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -38,13 +38,13 @@ impl Ingester { trace!(object_created = ?object_created, "ingesting object created events"); let TransposedS3EventMessages { + sequencers, object_ids, event_times, buckets, keys, sizes, e_tags, - portal_run_ids, storage_classes, last_modified_dates, .. @@ -53,21 +53,22 @@ impl Ingester { query_file!( "../database/queries/ingester/insert_objects.sql", &object_ids, - &buckets, - &keys, &sizes as &[Option], - &e_tags as &[Option], - &event_times, - &last_modified_dates as &[Option>], - &portal_run_ids + &vec![None; sizes.len()] as &[Option], ) .execute(&self.client.pool) .await?; query_file!( - "../database/queries/ingester/aws/insert_s3_objects.sql", + "../database/queries/ingester/aws/insert_s3_created_objects.sql", &object_ids, - &storage_classes as &[Option] + &buckets, + &keys, + &event_times, + &last_modified_dates as &[Option>], + &e_tags as &[Option], + &storage_classes as &[Option], + &sequencers as &[Option] ) .execute(&self.client.pool) .await?; @@ -81,7 +82,7 @@ impl Ingester { } = object_removed; query_file!( - "../database/queries/ingester/update_deleted.sql", + "../database/queries/ingester/aws/update_deleted.sql", &keys, &buckets, &event_times From 8c3c5754cf36a5c9693e7936be5c3d2b1fc98e32 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 22 Jan 2024 09:38:49 +1100 Subject: [PATCH 11/17] test(filemanager): fix tests according to new schema --- .../filemanager/src/database/aws/ingester.rs | 62 +++++++++++-------- .../filemanager/src/handlers/aws.rs | 18 ++++-- 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index 99502d155..d99d43c6d 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -128,12 +128,17 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let result = sqlx::query("select * from object") + let object_results = sqlx::query("select * from object") .fetch_one(ingester.client.pool()) .await .unwrap(); - assert_created(result); + let s3_object_results = sqlx::query("select * from s3_object") + .fetch_one(ingester.client.pool()) + .await + .unwrap(); + + assert_created(object_results, s3_object_results); } #[sqlx::test(migrator = "MIGRATOR")] @@ -143,12 +148,17 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let result = sqlx::query("select * from object") + let object_results = sqlx::query("select * from object") .fetch_one(ingester.client.pool()) .await .unwrap(); - assert_deleted(result); + let s3_object_results = sqlx::query("select * from s3_object") + .fetch_one(ingester.client.pool()) + .await + .unwrap(); + + assert_deleted(object_results, s3_object_results); } #[sqlx::test(migrator = "MIGRATOR")] @@ -158,52 +168,50 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest(EventSourceType::S3(events)).await.unwrap(); - let result = sqlx::query("select * from object") + let object_results = sqlx::query("select * from object") + .fetch_one(ingester.client.pool()) + .await + .unwrap(); + let s3_object_results = sqlx::query("select * from s3_object") .fetch_one(ingester.client.pool()) .await .unwrap(); - assert_deleted(result); + assert_deleted(object_results, s3_object_results); } - pub(crate) fn assert_created(result: PgRow) { - assert_eq!("bucket", result.get::("bucket")); - assert_eq!("key", result.get::("key")); - assert_eq!(0, result.get::("size")); - assert_eq!(EXPECTED_E_TAG, result.get::("hash")); + pub(crate) fn assert_created(object_results: PgRow, s3_object_results: PgRow) { + assert_eq!("bucket", s3_object_results.get::("bucket")); + assert_eq!("key", s3_object_results.get::("key")); + assert_eq!(0, object_results.get::("size")); + assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( DateTime::::default(), - result.get::, _>("created_date") + s3_object_results.get::, _>("created_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("last_modified_date") + s3_object_results.get::, _>("last_modified_date") ); - assert!(result - .get::("portal_run_id") - .starts_with("19700101")); } - pub(crate) fn assert_deleted(result: PgRow) { - assert_eq!("bucket", result.get::("bucket")); - assert_eq!("key", result.get::("key")); - assert_eq!(0, result.get::("size")); - assert_eq!(EXPECTED_E_TAG, result.get::("hash")); + pub(crate) fn assert_deleted(object_results: PgRow, s3_object_results: PgRow) { + assert_eq!("bucket", s3_object_results.get::("bucket")); + assert_eq!("key", s3_object_results.get::("key")); + assert_eq!(0, object_results.get::("size")); + assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( DateTime::::default(), - result.get::, _>("created_date") + s3_object_results.get::, _>("created_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("last_modified_date") + s3_object_results.get::, _>("last_modified_date") ); assert_eq!( DateTime::::default(), - result.get::, _>("deleted_date") + s3_object_results.get::, _>("deleted_date") ); - assert!(result - .get::("portal_run_id") - .starts_with("19700101")); } fn test_events() -> Events { diff --git a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs index d75e2e2b3..796a1ab55 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs @@ -110,12 +110,17 @@ mod tests { .await .unwrap(); - let result = sqlx::query("select * from object") + let object_results = sqlx::query("select * from object") .fetch_one(ingester.client().pool()) .await .unwrap(); - assert_deleted(result); + let s3_object_results = sqlx::query("select * from s3_object") + .fetch_one(ingester.client().pool()) + .await + .unwrap(); + + assert_deleted(object_results, s3_object_results); } #[sqlx::test(migrator = "MIGRATOR")] @@ -135,11 +140,16 @@ mod tests { .await .unwrap(); - let result = sqlx::query("select * from object") + let object_results = sqlx::query("select * from object") + .fetch_one(ingester.client().pool()) + .await + .unwrap(); + + let s3_object_results = sqlx::query("select * from s3_object") .fetch_one(ingester.client().pool()) .await .unwrap(); - assert_deleted(result); + assert_deleted(object_results, s3_object_results); } } From c20e946a5216bc438a1cb96398db4993b8cf1b06 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 09:19:26 +1100 Subject: [PATCH 12/17] test(filemanager): defer initializing foreign key and run inserts in a transaction --- .../migrations/0001_add_object_table.sql | 2 +- .../migrations/0002_add_s3_object_table.sql | 6 +- .../aws/insert_s3_created_objects.sql | 15 +++-- .../aws/insert_s3_deleted_objects.sql | 15 +++-- .../queries/ingester/aws/update_deleted.sql | 2 +- .../filemanager/src/database/aws/ingester.rs | 58 +++++++++++++++---- .../filemanager/filemanager/src/error.rs | 2 + 7 files changed, 72 insertions(+), 28 deletions(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql index 05ecf9248..170fe29ee 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0001_add_object_table.sql @@ -1,7 +1,7 @@ -- An general object table common across all storage types. create table object ( -- The unique id for this object. - object_id uuid not null default gen_random_uuid() primary key, + object_id uuid not null primary key default gen_random_uuid(), -- The size of the object. size integer default null, -- A unique identifier for the object, if it is present. diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index 5f2bd79ac..b656e6bc5 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -2,8 +2,10 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel -- An object contain in AWS S3, maps as a one-to-one relationship with the object table. create table s3_object( - -- The object id. - object_id uuid references object (object_id) primary key, + -- The s3 object id. + s3_object_id uuid not null primary key default gen_random_uuid(), + -- This is initially deferred because we want to create an s3_object before an object to check for duplicates/order. + object_id uuid references object (object_id) deferrable initially deferred, -- General fields -- The bucket of the object. diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index ec6d7e8cd..9c5dbb2d6 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -1,5 +1,6 @@ -- Bulk insert of s3 objects. insert into s3_object ( + s3_object_id, object_id, bucket, key, @@ -11,12 +12,14 @@ insert into s3_object ( ) values ( unnest($1::uuid[]), - unnest($2::text[]), + unnest($2::uuid[]), unnest($3::text[]), - unnest($4::timestamptz[]), + unnest($4::text[]), unnest($5::timestamptz[]), - unnest($6::text[]), - unnest($7::storage_class[]), - unnest($8::text[]) + unnest($6::timestamptz[]), + unnest($7::text[]), + unnest($8::storage_class[]), + unnest($9::text[]) ) on conflict on constraint created_sequencer_unique do update - set number_duplicate_events = s3_object.number_duplicate_events + 1; \ No newline at end of file + set number_duplicate_events = s3_object.number_duplicate_events + 1 + returning object_id, number_duplicate_events; \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql index 068375b7a..ee4d80814 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -1,5 +1,6 @@ -- Bulk insert of s3 objects. insert into s3_object ( + s3_object_id, object_id, bucket, key, @@ -14,13 +15,15 @@ insert into s3_object ( ) values ( unnest($1::uuid[]), - unnest($2::text[]), + unnest($2::uuid[]), unnest($3::text[]), - unnest($4::timestamptz[]), + unnest($4::text[]), unnest($5::timestamptz[]), unnest($6::timestamptz[]), - unnest($7::text[]), - unnest($8::storage_class[]), - unnest($9::text[]) + unnest($7::timestamptz[]), + unnest($8::text[]), + unnest($9::storage_class[]), + unnest($10::text[]) ) on conflict on constraint deleted_sequencer_unique do update - set number_duplicate_events = s3_object.number_duplicate_events + 1; + set number_duplicate_events = s3_object.number_duplicate_events + 1 + returning object_id, number_duplicate_events; \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql index 5480f76cf..b6adabf1b 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/update_deleted.sql @@ -6,4 +6,4 @@ from (select unnest($2::varchar[]) as bucket, unnest($3::timestamptz[]) as deleted_time ) as data -where s3_object.key = data.key and s3_object.bucket = data.bucket; + where s3_object.key = data.key and s3_object.bucket = data.bucket; diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index d99d43c6d..6436f0541 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -1,13 +1,14 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::query_file; -use tracing::trace; +use tracing::{debug, trace}; use crate::database::{Client, Ingest}; use crate::error::Result; use crate::events::aws::StorageClass; use crate::events::aws::{Events, TransposedS3EventMessages}; use crate::events::EventSourceType; +use uuid::Uuid; /// An ingester for S3 events. #[derive(Debug)] @@ -50,17 +51,11 @@ impl Ingester { .. } = object_created; - query_file!( - "../database/queries/ingester/insert_objects.sql", - &object_ids, - &sizes as &[Option], - &vec![None; sizes.len()] as &[Option], - ) - .execute(&self.client.pool) - .await?; + let mut tx = self.client().pool().begin().await?; - query_file!( + let mut inserted = query_file!( "../database/queries/ingester/aws/insert_s3_created_objects.sql", + &vec![Uuid::new_v4(); sizes.len()] as &[Uuid], &object_ids, &buckets, &keys, @@ -70,9 +65,46 @@ impl Ingester { &storage_classes as &[Option], &sequencers as &[Option] ) - .execute(&self.client.pool) + .fetch_all(&mut *tx) .await?; + let (object_ids, sizes): (Vec<_>, Vec<_>) = object_ids + .into_iter() + .rev() + .zip(sizes.into_iter().rev()) + .flat_map(|(object_id, size)| { + // If we cannot find the object in our new ids, this object already exists. + let pos = inserted + .iter() + .rposition(|record| record.object_id == Some(object_id))?; + + // We can remove this to avoid searching over it again. + let record = inserted.remove(pos); + debug!( + object_id = ?record.object_id, + number_duplicate_events = record.number_duplicate_events, + "duplicate event found" + ); + + // This is a new event so the corresponding object should be inserted. + Some((object_id, size)) + }) + .unzip(); + + // Insert only the non duplicate events. + if !object_ids.is_empty() { + debug!(count = object_ids.len(), "inserting objects"); + + query_file!( + "../database/queries/ingester/insert_objects.sql", + &object_ids, + &sizes as &[Option], + &vec![None; sizes.len()] as &[Option], + ) + .execute(&mut *tx) + .await?; + } + trace!(object_removed = ?object_removed, "ingesting object removed events"); let TransposedS3EventMessages { event_times, @@ -87,9 +119,11 @@ impl Ingester { &buckets, &event_times ) - .execute(&self.client.pool) + .execute(&mut *tx) .await?; + tx.commit().await?; + Ok(()) } diff --git a/lib/workload/stateful/filemanager/filemanager/src/error.rs b/lib/workload/stateful/filemanager/filemanager/src/error.rs index 2ad1b2e13..427f0d88a 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/error.rs @@ -39,6 +39,8 @@ pub enum Error { S3Error(String), #[error("Config error: `{0}`")] ConfigError(String), + #[error("Ingester error: `{0}`")] + IngesterError(String), } impl From for Error { From 80dd346c17dfcb69ba784f1c3ec0ed59dec02c1d Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 13:39:57 +1100 Subject: [PATCH 13/17] test(filemanager): duplicate events database test --- .../filemanager/src/database/aws/ingester.rs | 80 +++++++++++-------- .../filemanager/src/events/aws/mod.rs | 2 +- .../filemanager/src/handlers/aws.rs | 24 +----- 3 files changed, 52 insertions(+), 54 deletions(-) diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index 6436f0541..8ace83a42 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -76,7 +76,10 @@ impl Ingester { // If we cannot find the object in our new ids, this object already exists. let pos = inserted .iter() - .rposition(|record| record.object_id == Some(object_id))?; + .rposition(|record| { + // This will never be `None`, maybe this is an sqlx bug? + record.object_id == Some(object_id) + })?; // We can remove this to avoid searching over it again. let record = inserted.remove(pos); @@ -147,7 +150,7 @@ pub(crate) mod tests { use crate::database::aws::ingester::Ingester; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; - use crate::events::aws::tests::{expected_events, EXPECTED_E_TAG}; + use crate::events::aws::tests::{expected_events, EXPECTED_E_TAG, expected_flat_events}; use crate::events::aws::{Events, StorageClass}; use crate::events::EventSourceType; use chrono::{DateTime, Utc}; @@ -162,15 +165,7 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let object_results = sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); - - let s3_object_results = sqlx::query("select * from s3_object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; assert_created(object_results, s3_object_results); } @@ -182,15 +177,7 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest_events(events).await.unwrap(); - let object_results = sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); - - let s3_object_results = sqlx::query("select * from s3_object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(); + let (object_results, s3_object_results) = fetch_results(&ingester).await; assert_deleted(object_results, s3_object_results); } @@ -202,16 +189,30 @@ pub(crate) mod tests { let ingester = test_ingester(pool); ingester.ingest(EventSourceType::S3(events)).await.unwrap(); - let object_results = sqlx::query("select * from object") + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_deleted(object_results, s3_object_results); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates(pool: PgPool) { + let ingester = test_ingester(pool); + ingester.ingest(EventSourceType::S3(test_events())).await.unwrap(); + ingester.ingest(EventSourceType::S3(test_events())).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_deleted(object_results, s3_object_results); + } + + pub(crate) async fn fetch_results(ingester: &Ingester) -> (PgRow, PgRow) { + (sqlx::query("select * from object") .fetch_one(ingester.client.pool()) .await - .unwrap(); - let s3_object_results = sqlx::query("select * from s3_object") + .unwrap(), sqlx::query("select * from s3_object") .fetch_one(ingester.client.pool()) .await - .unwrap(); - - assert_deleted(object_results, s3_object_results); + .unwrap()) } pub(crate) fn assert_created(object_results: PgRow, s3_object_results: PgRow) { @@ -248,17 +249,32 @@ pub(crate) mod tests { ); } - fn test_events() -> Events { - let mut events = expected_events(); + fn update_test_events(mut events: Events) -> Events { + let update_last_modified = |dates: &mut Vec>>| { + dates.iter_mut().for_each(|last_modified| { + *last_modified = Some(DateTime::default()); + }); + }; + let update_storage_class = |classes: &mut Vec>| { + classes.iter_mut().for_each(|storage_class| { + *storage_class = Some(StorageClass::Standard); + }); + }; - events.object_created.last_modified_dates[0] = Some(DateTime::default()); - events.object_created.storage_classes[0] = Some(StorageClass::Standard); + update_last_modified(&mut events.object_created.last_modified_dates); + update_storage_class(&mut events.object_created.storage_classes); - events.object_removed.last_modified_dates[0] = Some(DateTime::default()); - events.object_removed.storage_classes[0] = None; + + update_last_modified(&mut events.object_removed.last_modified_dates); + update_storage_class(&mut events.object_removed.storage_classes); events } + + fn test_events() -> Events { + update_test_events(expected_events()) + } + fn test_ingester(pool: PgPool) -> Ingester { Ingester::new(Client::new(pool)) } diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 2728363b1..8ca872f80 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -141,7 +141,7 @@ impl From for TransposedS3EventMessages { } /// Group by event types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Events { pub object_created: TransposedS3EventMessages, pub object_removed: TransposedS3EventMessages, diff --git a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs index 796a1ab55..f5c1aded2 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs @@ -89,7 +89,7 @@ pub async fn ingest_event( #[cfg(test)] mod tests { use super::*; - use crate::database::aws::ingester::tests::assert_deleted; + use crate::database::aws::ingester::tests::{assert_deleted, fetch_results}; use crate::database::aws::migration::tests::MIGRATOR; use crate::events::aws::collecter::tests::{expected_head_object, set_s3_client_expectations}; use crate::events::aws::collector_builder::tests::set_sqs_client_expectations; @@ -110,16 +110,7 @@ mod tests { .await .unwrap(); - let object_results = sqlx::query("select * from object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); - - let s3_object_results = sqlx::query("select * from s3_object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); - + let (object_results, s3_object_results) = fetch_results(&ingester).await; assert_deleted(object_results, s3_object_results); } @@ -140,16 +131,7 @@ mod tests { .await .unwrap(); - let object_results = sqlx::query("select * from object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); - - let s3_object_results = sqlx::query("select * from s3_object") - .fetch_one(ingester.client().pool()) - .await - .unwrap(); - + let (object_results, s3_object_results) = fetch_results(&ingester).await; assert_deleted(object_results, s3_object_results); } } From 727c3d9a722d1c45c9acf1c8349a65ae52bc2dca Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 14:39:05 +1100 Subject: [PATCH 14/17] test(filemanager): add complex duplicates test --- .../filemanager/src/database/aws/ingester.rs | 120 ++++++++++++++---- .../filemanager/src/events/aws/mod.rs | 4 +- .../filemanager/src/handlers/aws.rs | 10 +- 3 files changed, 106 insertions(+), 28 deletions(-) diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index 8ace83a42..1eb105bd6 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -74,12 +74,10 @@ impl Ingester { .zip(sizes.into_iter().rev()) .flat_map(|(object_id, size)| { // If we cannot find the object in our new ids, this object already exists. - let pos = inserted - .iter() - .rposition(|record| { - // This will never be `None`, maybe this is an sqlx bug? - record.object_id == Some(object_id) - })?; + let pos = inserted.iter().rposition(|record| { + // This will never be `None`, maybe this is an sqlx bug? + record.object_id == Some(object_id) + })?; // We can remove this to avoid searching over it again. let record = inserted.remove(pos); @@ -150,12 +148,15 @@ pub(crate) mod tests { use crate::database::aws::ingester::Ingester; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; - use crate::events::aws::tests::{expected_events, EXPECTED_E_TAG, expected_flat_events}; - use crate::events::aws::{Events, StorageClass}; + use crate::events::aws::tests::{ + expected_events, expected_flat_events, EXPECTED_E_TAG, EXPECTED_SEQUENCER_CREATED, + }; + use crate::events::aws::{Events, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; use chrono::{DateTime, Utc}; use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; + use std::ops::Add; #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_object_created(pool: PgPool) { @@ -167,7 +168,9 @@ pub(crate) mod tests { let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_created(object_results, s3_object_results); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_created(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -179,7 +182,9 @@ pub(crate) mod tests { let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(object_results, s3_object_results); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -191,35 +196,95 @@ pub(crate) mod tests { let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(object_results, s3_object_results); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_duplicates(pool: PgPool) { let ingester = test_ingester(pool); - ingester.ingest(EventSourceType::S3(test_events())).await.unwrap(); - ingester.ingest(EventSourceType::S3(test_events())).await.unwrap(); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(object_results, s3_object_results); + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); } - pub(crate) async fn fetch_results(ingester: &Ingester) -> (PgRow, PgRow) { - (sqlx::query("select * from object") - .fetch_one(ingester.client.pool()) - .await - .unwrap(), sqlx::query("select * from s3_object") - .fetch_one(ingester.client.pool()) + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates_complex(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) .await - .unwrap()) + .unwrap(); + + let event = expected_flat_events().sort_and_dedup().into_inner(); + let mut event = event[0].clone(); + event.sequencer = Some(event.sequencer.unwrap().add("7")); + + let mut events = vec![event]; + events.extend(expected_flat_events().sort_and_dedup().into_inner()); + + let events = update_test_events(FlatS3EventMessages(events).into()); + + ingester.ingest(EventSourceType::S3(events)).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); + assert_created_with_sequencer( + &object_results[1], + &s3_object_results[1], + &EXPECTED_SEQUENCER_CREATED.to_string().add("7"), + ); + } + + pub(crate) async fn fetch_results(ingester: &Ingester) -> (Vec, Vec) { + ( + sqlx::query("select * from object") + .fetch_all(ingester.client.pool()) + .await + .unwrap(), + sqlx::query("select * from s3_object") + .fetch_all(ingester.client.pool()) + .await + .unwrap(), + ) } - pub(crate) fn assert_created(object_results: PgRow, s3_object_results: PgRow) { + pub(crate) fn assert_created_with_sequencer( + object_results: &PgRow, + s3_object_results: &PgRow, + expected_sequencer: &str, + ) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); assert_eq!(0, object_results.get::("size")); assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); + assert_eq!( + expected_sequencer, + s3_object_results.get::("created_sequencer") + ); assert_eq!( DateTime::::default(), s3_object_results.get::, _>("created_date") @@ -230,7 +295,15 @@ pub(crate) mod tests { ); } - pub(crate) fn assert_deleted(object_results: PgRow, s3_object_results: PgRow) { + pub(crate) fn assert_created(object_results: &PgRow, s3_object_results: &PgRow) { + assert_created_with_sequencer( + object_results, + s3_object_results, + EXPECTED_SEQUENCER_CREATED, + ) + } + + pub(crate) fn assert_deleted(object_results: &PgRow, s3_object_results: &PgRow) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); assert_eq!(0, object_results.get::("size")); @@ -264,7 +337,6 @@ pub(crate) mod tests { update_last_modified(&mut events.object_created.last_modified_dates); update_storage_class(&mut events.object_created.storage_classes); - update_last_modified(&mut events.object_removed.last_modified_dates); update_storage_class(&mut events.object_removed.storage_classes); diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 8ca872f80..5027971d9 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -296,7 +296,7 @@ impl FlatS3EventMessages { } } -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone)] pub enum EventType { Created, Removed, @@ -304,7 +304,7 @@ pub enum EventType { } /// A flattened AWS S3 record -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone)] pub struct FlatS3EventMessage { pub sequencer: Option, pub event_name: String, diff --git a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs index f5c1aded2..e94edb8f8 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/handlers/aws.rs @@ -111,7 +111,10 @@ mod tests { .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(object_results, s3_object_results); + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } #[sqlx::test(migrator = "MIGRATOR")] @@ -132,6 +135,9 @@ mod tests { .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; - assert_deleted(object_results, s3_object_results); + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_deleted(&object_results[0], &s3_object_results[0]); } } From a04f108ca0cb225444dc1103c9d35f93f831aa5e Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 14:43:03 +1100 Subject: [PATCH 15/17] refactor(filemanager): remove macros as its not used --- lib/workload/stateful/filemanager/Cargo.lock | 10 ----- lib/workload/stateful/filemanager/Cargo.toml | 1 - .../filemanager/filemanager-macros/Cargo.toml | 15 ------- .../filemanager/filemanager-macros/README.md | 3 -- .../filemanager/filemanager-macros/src/lib.rs | 43 ------------------- 5 files changed, 72 deletions(-) delete mode 100644 lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml delete mode 100644 lib/workload/stateful/filemanager/filemanager-macros/README.md delete mode 100644 lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs diff --git a/lib/workload/stateful/filemanager/Cargo.lock b/lib/workload/stateful/filemanager/Cargo.lock index 99a34cd56..674ca63e6 100644 --- a/lib/workload/stateful/filemanager/Cargo.lock +++ b/lib/workload/stateful/filemanager/Cargo.lock @@ -1125,16 +1125,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "filemanager-macros" -version = "0.1.0" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "syn 2.0.37", -] - [[package]] name = "filemanager-migrate-lambda" version = "0.1.0" diff --git a/lib/workload/stateful/filemanager/Cargo.toml b/lib/workload/stateful/filemanager/Cargo.toml index cabc395d8..40e4c7565 100644 --- a/lib/workload/stateful/filemanager/Cargo.toml +++ b/lib/workload/stateful/filemanager/Cargo.toml @@ -3,7 +3,6 @@ resolver = "2" members = [ "filemanager", - "filemanager-macros", "filemanager-http-lambda", "filemanager-ingest-lambda", "filemanager-migrate-lambda" diff --git a/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml b/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml deleted file mode 100644 index 25d5c6b00..000000000 --- a/lib/workload/stateful/filemanager/filemanager-macros/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "filemanager-macros" -version = "0.1.0" -license.workspace = true -edition.workspace = true -authors.workspace = true - -[lib] -proc-macro = true - -[dependencies] -syn = { version = "2.0", features = ["full"] } -quote = "1.0" -proc-macro2 = "1.0" -darling = "0.20" \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager-macros/README.md b/lib/workload/stateful/filemanager/filemanager-macros/README.md deleted file mode 100644 index 2cdf84aae..000000000 --- a/lib/workload/stateful/filemanager/filemanager-macros/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# filemanager-macros - -This crate contains macro implementations for filemanager. \ No newline at end of file diff --git a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs b/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs deleted file mode 100644 index d03ff9cd6..000000000 --- a/lib/workload/stateful/filemanager/filemanager-macros/src/lib.rs +++ /dev/null @@ -1,43 +0,0 @@ -use darling::ast::NestedMeta; -use darling::{Error, FromMeta}; -use proc_macro::TokenStream; -use quote::{format_ident, quote}; -use syn::ItemFn; - -#[derive(Debug, FromMeta)] -struct MacroArgs {} - -#[proc_macro_attribute] -pub fn import_plrust_function(args: TokenStream, input: TokenStream) -> TokenStream { - let attr_args = match NestedMeta::parse_meta_list(args.into()) { - Ok(v) => v, - Err(e) => { - return TokenStream::from(Error::from(e).write_errors()); - } - }; - let _args = match MacroArgs::from_list(&attr_args) { - Ok(v) => v, - Err(e) => { - return TokenStream::from(e.write_errors()); - } - }; - - let input = syn::parse_macro_input!(input as ItemFn); - - let input_clone = input.clone(); - let vis = input_clone.vis; - let name = format_ident!("{}_migrate", input_clone.sig.ident); - - let method = quote! { - #vis fn #name() -> sqlx::migrate::Migrator { - todo!(); - } - }; - - let tokens = quote! { - #input - #method - }; - - tokens.into() -} From ff9a5d47a360ad983d223d518ecda9a9cf257214 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 14:46:20 +1100 Subject: [PATCH 16/17] style(filemanager): formatting --- .../database/migrations/0002_add_s3_object_table.sql | 2 +- .../database/queries/ingester/aws/insert_s3_created_objects.sql | 2 +- .../database/queries/ingester/aws/insert_s3_deleted_objects.sql | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index b656e6bc5..f989a389c 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -37,4 +37,4 @@ create table s3_object( -- The sequencers should be unique with the bucket and key, otherwise this is a duplicate event. constraint created_sequencer_unique unique (bucket, key, created_sequencer), constraint deleted_sequencer_unique unique (bucket, key, deleted_sequencer) -); \ No newline at end of file +); diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index 9c5dbb2d6..0337af86a 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -22,4 +22,4 @@ values ( unnest($9::text[]) ) on conflict on constraint created_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 - returning object_id, number_duplicate_events; \ No newline at end of file + returning object_id, number_duplicate_events; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql index ee4d80814..ec05828d0 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -26,4 +26,4 @@ values ( unnest($10::text[]) ) on conflict on constraint deleted_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 - returning object_id, number_duplicate_events; \ No newline at end of file + returning object_id, number_duplicate_events; From 8908ea36609a4fcc1d8bbb85442f9f19013363b5 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 23 Jan 2024 15:49:46 +1100 Subject: [PATCH 17/17] fix(filemanager): consider version id when de-duplicating as well --- .../migrations/0002_add_s3_object_table.sql | 8 +- .../aws/insert_s3_created_objects.sql | 4 +- .../aws/insert_s3_deleted_objects.sql | 4 +- .../filemanager/src/database/aws/ingester.rs | 56 +++++- .../filemanager/src/events/aws/mod.rs | 173 +++++++++++++++++- 5 files changed, 228 insertions(+), 17 deletions(-) diff --git a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql index f989a389c..fe1ca6176 100644 --- a/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateful/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -25,6 +25,8 @@ create table s3_object( e_tag text default null, -- The S3 storage class of the object. storage_class storage_class not null, + -- The version id of the object, if present. + version_id text default null, -- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events. created_sequencer text default null, -- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events. @@ -34,7 +36,7 @@ create table s3_object( -- Record the number of duplicate events received for this object, useful for debugging. number_duplicate_events integer not null default 0, - -- The sequencers should be unique with the bucket and key, otherwise this is a duplicate event. - constraint created_sequencer_unique unique (bucket, key, created_sequencer), - constraint deleted_sequencer_unique unique (bucket, key, deleted_sequencer) + -- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event. + constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer), + constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer) ); diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index 0337af86a..3e00bf96c 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -8,6 +8,7 @@ insert into s3_object ( last_modified_date, e_tag, storage_class, + version_id, created_sequencer ) values ( @@ -19,7 +20,8 @@ values ( unnest($6::timestamptz[]), unnest($7::text[]), unnest($8::storage_class[]), - unnest($9::text[]) + unnest($9::text[]), + unnest($10::text[]) ) on conflict on constraint created_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 returning object_id, number_duplicate_events; diff --git a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql index ec05828d0..f0bf29524 100644 --- a/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql +++ b/lib/workload/stateful/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -11,6 +11,7 @@ insert into s3_object ( last_modified_date, e_tag, storage_class, + version_id, deleted_sequencer ) values ( @@ -23,7 +24,8 @@ values ( unnest($7::timestamptz[]), unnest($8::text[]), unnest($9::storage_class[]), - unnest($10::text[]) + unnest($10::text[]), + unnest($11::text[]) ) on conflict on constraint deleted_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 returning object_id, number_duplicate_events; diff --git a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs index 1eb105bd6..db8bfd63b 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/database/aws/ingester.rs @@ -44,6 +44,7 @@ impl Ingester { event_times, buckets, keys, + version_ids, sizes, e_tags, storage_classes, @@ -63,6 +64,7 @@ impl Ingester { &last_modified_dates as &[Option>], &e_tags as &[Option], &storage_classes as &[Option], + &version_ids as &[Option], &sequencers as &[Option] ) .fetch_all(&mut *tx) @@ -150,6 +152,7 @@ pub(crate) mod tests { use crate::database::{Client, Ingest}; use crate::events::aws::tests::{ expected_events, expected_flat_events, EXPECTED_E_TAG, EXPECTED_SEQUENCER_CREATED, + EXPECTED_VERSION_ID, }; use crate::events::aws::{Events, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; @@ -252,13 +255,50 @@ pub(crate) mod tests { s3_object_results[0].get::("number_duplicate_events") ); assert_deleted(&object_results[0], &s3_object_results[0]); - assert_created_with_sequencer( + assert_created_with( &object_results[1], &s3_object_results[1], + EXPECTED_VERSION_ID, &EXPECTED_SEQUENCER_CREATED.to_string().add("7"), ); } + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates_with_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + + let event = expected_flat_events().sort_and_dedup().into_inner(); + let mut event = event[0].clone(); + event.version_id = Some("version_id".to_string()); + + let mut events = vec![event]; + events.extend(expected_flat_events().sort_and_dedup().into_inner()); + + let events = update_test_events(FlatS3EventMessages(events).into()); + + ingester.ingest(EventSourceType::S3(events)).await.unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 2); + assert_eq!(s3_object_results.len(), 2); + assert_eq!( + 1, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted(&object_results[0], &s3_object_results[0]); + assert_created_with( + &object_results[1], + &s3_object_results[1], + "version_id", + EXPECTED_SEQUENCER_CREATED, + ); + } + pub(crate) async fn fetch_results(ingester: &Ingester) -> (Vec, Vec) { ( sqlx::query("select * from object") @@ -272,15 +312,20 @@ pub(crate) mod tests { ) } - pub(crate) fn assert_created_with_sequencer( + pub(crate) fn assert_created_with( object_results: &PgRow, s3_object_results: &PgRow, + expected_version_id: &str, expected_sequencer: &str, ) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); assert_eq!(0, object_results.get::("size")); assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); + assert_eq!( + expected_version_id, + s3_object_results.get::("version_id") + ); assert_eq!( expected_sequencer, s3_object_results.get::("created_sequencer") @@ -296,9 +341,10 @@ pub(crate) mod tests { } pub(crate) fn assert_created(object_results: &PgRow, s3_object_results: &PgRow) { - assert_created_with_sequencer( + assert_created_with( object_results, s3_object_results, + EXPECTED_VERSION_ID, EXPECTED_SEQUENCER_CREATED, ) } @@ -306,6 +352,10 @@ pub(crate) mod tests { pub(crate) fn assert_deleted(object_results: &PgRow, s3_object_results: &PgRow) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); + assert_eq!( + EXPECTED_VERSION_ID, + s3_object_results.get::("version_id") + ); assert_eq!(0, object_results.get::("size")); assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( diff --git a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs index 5027971d9..0ae4870b4 100644 --- a/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateful/filemanager/filemanager/src/events/aws/mod.rs @@ -67,6 +67,7 @@ pub struct TransposedS3EventMessages { pub event_names: Vec, pub buckets: Vec, pub keys: Vec, + pub version_ids: Vec>, pub sizes: Vec>, pub e_tags: Vec>, pub sequencers: Vec>, @@ -85,6 +86,7 @@ impl TransposedS3EventMessages { event_names: Vec::with_capacity(capacity), buckets: Vec::with_capacity(capacity), keys: Vec::with_capacity(capacity), + version_ids: Vec::with_capacity(capacity), sizes: Vec::with_capacity(capacity), e_tags: Vec::with_capacity(capacity), sequencers: Vec::with_capacity(capacity), @@ -103,6 +105,7 @@ impl TransposedS3EventMessages { bucket, key, size, + version_id, e_tag, sequencer, portal_run_id, @@ -116,6 +119,7 @@ impl TransposedS3EventMessages { self.event_names.push(event_name); self.buckets.push(bucket); self.keys.push(key); + self.version_ids.push(version_id); self.sizes.push(size); self.e_tags.push(e_tag); self.sequencers.push(sequencer); @@ -219,18 +223,17 @@ impl FlatS3EventMessages { value.event_name.clone(), value.bucket.clone(), value.key.clone(), - value.size, - value.e_tag.clone(), + value.version_id.clone(), // Note, `last_modified` and `storage_class` are always `None` at this point anyway so don't need - // to be considered. + // to be considered. `size` and `e_tag` should be the same but are unimportant in deduplication. ) }) .collect(), ) } - /// Ordering is implemented so that the sequencer values are considered when the bucket and the - /// key are the same. + /// Ordering is implemented so that the sequencer values are considered when the bucket, the + /// key and the version id are the same. /// /// Unlike the `dedup` function, this implementation does consider the event time. This means that events /// will be ingested in event time order if the sequencer condition is not met. @@ -242,13 +245,18 @@ impl FlatS3EventMessages { if let (Some(a_sequencer), Some(b_sequencer)) = (a.sequencer.as_ref(), b.sequencer.as_ref()) { - if a.bucket == b.bucket && a.key == b.key { + if a.bucket == b.bucket + && a.key == b.key + && a.version_id == b.version_id + && a.event_name == b.event_name + { return ( a_sequencer, &a.event_time, &a.event_name, &a.bucket, &a.key, + &a.version_id, &a.size, &a.e_tag, &a.storage_class, @@ -260,6 +268,7 @@ impl FlatS3EventMessages { &b.event_name, &b.bucket, &b.key, + &b.version_id, &b.size, &b.e_tag, &b.storage_class, @@ -274,6 +283,7 @@ impl FlatS3EventMessages { &a.event_name, &a.bucket, &a.key, + &a.version_id, &a.size, &a.e_tag, &a.storage_class, @@ -285,6 +295,7 @@ impl FlatS3EventMessages { &b.event_name, &b.bucket, &b.key, + &b.version_id, &b.size, &b.e_tag, &b.storage_class, @@ -310,6 +321,7 @@ pub struct FlatS3EventMessage { pub event_name: String, pub bucket: String, pub key: String, + pub version_id: Option, pub size: Option, pub e_tag: Option, pub storage_class: Option, @@ -348,6 +360,84 @@ impl FlatS3EventMessage { e_tag.into_iter().for_each(|e_tag| self.e_tag = Some(e_tag)); self } + + /// Set the sequencer value. + pub fn with_sequencer(mut self, sequencer: Option) -> Self { + self.sequencer = sequencer; + self + } + + /// Set the event name. + pub fn with_event_name(mut self, event_name: String) -> Self { + self.event_name = event_name; + self + } + + /// Set the bucket. + pub fn with_bucket(mut self, bucket: String) -> Self { + self.bucket = bucket; + self + } + + /// Set the key. + pub fn with_key(mut self, key: String) -> Self { + self.key = key; + self + } + + /// Set the version id. + pub fn with_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } + + /// Set the size. + pub fn with_size(mut self, size: Option) -> Self { + self.size = size; + self + } + + /// Set the e_tag. + pub fn with_e_tag(mut self, e_tag: Option) -> Self { + self.e_tag = e_tag; + self + } + + /// Set the storage class. + pub fn with_storage_class(mut self, storage_class: Option) -> Self { + self.storage_class = storage_class; + self + } + + /// Set the last modified date. + pub fn with_last_modified_date(mut self, last_modified_date: Option>) -> Self { + self.last_modified_date = last_modified_date; + self + } + + /// Set the object id. + pub fn with_object_id(mut self, object_id: Uuid) -> Self { + self.object_id = object_id; + self + } + + /// Set the event time. + pub fn with_event_time(mut self, event_time: DateTime) -> Self { + self.event_time = event_time; + self + } + + /// Set the portal run id. + pub fn with_portal_run_id(mut self, portal_run_id: String) -> Self { + self.portal_run_id = portal_run_id; + self + } + + /// Set the event type. + pub fn with_event_type(mut self, event_type: EventType) -> Self { + self.event_type = event_type; + self + } } /// The basic AWS S3 Event. @@ -385,6 +475,7 @@ pub struct ObjectRecord { pub key: String, pub size: Option, pub e_tag: Option, + pub version_id: Option, pub sequencer: Option, } @@ -411,6 +502,7 @@ impl TryFrom for FlatS3EventMessages { key, size, e_tag, + version_id, sequencer, } = object; @@ -439,6 +531,7 @@ impl TryFrom for FlatS3EventMessages { size, e_tag, sequencer, + version_id, portal_run_id, // Head field are fetched later. storage_class: None, @@ -467,6 +560,8 @@ pub(crate) mod tests { pub(crate) const EXPECTED_SEQUENCER_DELETED: &str = "0055AED6DCD90281E6"; // pragma: allowlist secret pub(crate) const EXPECTED_E_TAG: &str = "d41d8cd98f00b204e9800998ecf8427e"; // pragma: allowlist secret + pub(crate) const EXPECTED_VERSION_ID: &str = "096fKKXTRTtl3on89fVO.nfljtsv6qko"; + #[test] fn test_flat_events() { let result = expected_flat_events(); @@ -478,6 +573,7 @@ pub(crate) mod tests { "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED, None, + Some(EXPECTED_VERSION_ID.to_string()), ); let second = result.next().unwrap(); @@ -486,6 +582,7 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); let third = result.next().unwrap(); @@ -494,6 +591,7 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); } @@ -508,6 +606,43 @@ pub(crate) mod tests { "ObjectCreated:Put", EXPECTED_SEQUENCER_CREATED, Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); + + let second = result.next().unwrap(); + assert_flat_s3_event( + second, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + Some(EXPECTED_VERSION_ID.to_string()), + ); + } + + #[test] + fn test_sort_and_dedup_with_version_id() { + let result = expected_flat_events(); + + let mut result = result.into_inner(); + result.push( + expected_flat_events() + .into_inner() + .first() + .unwrap() + .clone() + .with_version_id(Some("version_id".to_string())), + ); + + let result = FlatS3EventMessages(result).sort_and_dedup(); + let mut result = result.into_inner().into_iter(); + + let first = result.next().unwrap(); + assert_flat_s3_event( + first, + "ObjectCreated:Put", + EXPECTED_SEQUENCER_CREATED, + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), ); let second = result.next().unwrap(); @@ -516,6 +651,16 @@ pub(crate) mod tests { "ObjectRemoved:Delete", EXPECTED_SEQUENCER_DELETED, None, + Some(EXPECTED_VERSION_ID.to_string()), + ); + + let third = result.next().unwrap(); + assert_flat_s3_event( + third, + "ObjectRemoved:Delete", + EXPECTED_SEQUENCER_DELETED, + None, + Some("version_id".to_string()), ); } @@ -524,11 +669,13 @@ pub(crate) mod tests { event_name: &str, sequencer: &str, size: Option, + version_id: Option, ) { assert_eq!(event.event_time, DateTime::::default()); assert_eq!(event.event_name, event_name); assert_eq!(event.bucket, "bucket"); assert_eq!(event.key, "key"); + assert_eq!(event.version_id, version_id); assert_eq!(event.size, size); assert_eq!(event.e_tag, Some(EXPECTED_E_TAG.to_string())); // pragma: allowlist secret assert_eq!(event.sequencer, Some(sequencer.to_string())); @@ -549,6 +696,10 @@ pub(crate) mod tests { assert_eq!(result.object_created.buckets[0], "bucket"); assert_eq!(result.object_created.keys[0], "key"); assert_eq!(result.object_created.sizes[0], Some(0)); + assert_eq!( + result.object_created.version_ids[0], + Some(EXPECTED_VERSION_ID.to_string()) + ); assert_eq!( result.object_created.e_tags[0], Some(EXPECTED_E_TAG.to_string()) @@ -568,6 +719,10 @@ pub(crate) mod tests { assert_eq!(result.object_removed.event_names[0], "ObjectRemoved:Delete"); assert_eq!(result.object_removed.buckets[0], "bucket"); assert_eq!(result.object_removed.keys[0], "key"); + assert_eq!( + result.object_removed.version_ids[0], + Some(EXPECTED_VERSION_ID.to_string()) + ); assert_eq!(result.object_removed.sizes[0], None); assert_eq!( result.object_removed.e_tags[0], @@ -621,7 +776,7 @@ pub(crate) mod tests { "key": "key", "size": 0, "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_CREATED } }, @@ -650,7 +805,7 @@ pub(crate) mod tests { "key": "key", "size": 0, "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_CREATED } }); @@ -675,7 +830,7 @@ pub(crate) mod tests { // ObjectRemoved::Delete does not have a size, even though this isn't documented // anywhere. "eTag": EXPECTED_E_TAG, - "versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", + "versionId": EXPECTED_VERSION_ID, "sequencer": EXPECTED_SEQUENCER_DELETED } });