diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs index 4ac3fa454..bd8cb8ab7 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs @@ -4,7 +4,7 @@ use futures::StreamExt; use sqlx::{Executor, Postgres, QueryBuilder}; use uuid::Uuid; use crate::database::DbClient; -use crate::events::aws::{EventType, FlatS3EventMessage, FlatS3EventMessages}; +use crate::events::aws::{EventType, FlatS3EventMessage, FlatS3EventMessages, TransposedS3EventMessages}; use crate::events::aws::s3::S3; use crate::error::Result; @@ -31,22 +31,9 @@ impl Ingester { } pub async fn ingest_events(&self, events: FlatS3EventMessages) -> Result<()> { - let mut query_builder_object: QueryBuilder = QueryBuilder::new( - "INSERT INTO object (object_id, bucket, key, size, hash, created_date, last_modified_date, deleted_date, portal_run_id) ", - ); - let mut query_builder_s3_object: QueryBuilder = QueryBuilder::new( - "INSERT INTO s3_object (object_id, storage_class) ", - ); - - let mut update_object: QueryBuilder = QueryBuilder::new( - r#" - update object - set deleted_date = data.new_value - from - (select unnest(?) as key, unnest(?) as new_value) as data - where "table".key = data.key; - "#, - ); + let transposed_events = TransposedS3EventMessages::from(events); + + transposed_events.event_times.into_iter().group let mut events = join_all(events.into_inner().into_iter().map(|event| async move { let head = self.s3.head(&event.key, &event.bucket).await?; diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs index e7b10f03d..d251ee6c0 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::{BTreeSet, HashSet}; use aws_sdk_s3::operation::head_object::HeadObjectOutput; +use aws_sdk_s3::types::StorageClass; use uuid::Uuid; pub mod s3; @@ -16,14 +17,15 @@ pub mod sqs; pub struct TransposedS3EventMessages { pub object_ids: Vec, pub event_times: Vec>, - pub event_types: Vec, + pub event_names: Vec, pub buckets: Vec, pub keys: Vec, pub sizes: Vec, pub e_tags: Vec, pub sequencers: Vec>, pub portal_run_ids: Vec, - pub heads: Vec> + pub storage_classes: Vec>, + pub last_modified_dates: Vec>> } impl TransposedS3EventMessages { @@ -32,14 +34,15 @@ impl TransposedS3EventMessages { Self { object_ids: Vec::with_capacity(capacity), event_times: Vec::with_capacity(capacity), - event_types: Vec::with_capacity(capacity), + event_names: Vec::with_capacity(capacity), buckets: Vec::with_capacity(capacity), keys: Vec::with_capacity(capacity), sizes: Vec::with_capacity(capacity), e_tags: Vec::with_capacity(capacity), sequencers: Vec::with_capacity(capacity), portal_run_ids: Vec::with_capacity(capacity), - heads: Vec::with_capacity(capacity) + storage_classes: Vec::with_capacity(capacity), + last_modified_dates: Vec::with_capacity(capacity) } } @@ -48,26 +51,28 @@ impl TransposedS3EventMessages { let FlatS3EventMessage { object_id, event_time, - event_type, + event_name, bucket, key, size, e_tag, sequencer, portal_run_id, - head + storage_class, + last_modified_date, } = message; self.object_ids.push(object_id); self.event_times.push(event_time); - self.event_types.push(event_type); + self.event_names.push(event_name); self.buckets.push(bucket); self.keys.push(key); self.sizes.push(size); self.e_tags.push(e_tag); self.sequencers.push(sequencer); self.portal_run_ids.push(portal_run_id); - self.heads.push(head); + self.storage_classes.push(storage_class); + self.last_modified_dates.push(last_modified_date); } } @@ -83,7 +88,38 @@ impl From for TransposedS3EventMessages { } } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] +/// Group by event types. +pub struct GroupByEventType { + object_created: FlatS3EventMessages, + object_removed: FlatS3EventMessages, + other: FlatS3EventMessages +} + +impl From for GroupByEventType { + fn from(messages: FlatS3EventMessages) -> Self { + let mut object_created = FlatS3EventMessages::default(); + let mut object_removed = FlatS3EventMessages::default(); + let mut other = FlatS3EventMessages::default(); + + messages.into_inner().into_iter().for_each(|message| { + if message.event_name.contains("ObjectCreated") { + object_created.0.push(message); + } else if message.event_name.contains("ObjectRemoved") { + object_removed.0.push(message); + } else { + other.0.push(message); + } + }); + + Self { + object_created, + object_removed, + other + } + } +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Default)] #[serde(try_from = "S3EventMessage")] /// Flattened AWS S3 events pub struct FlatS3EventMessages(pub Vec); @@ -166,32 +202,32 @@ impl PartialOrd for FlatS3EventMessage { } } -/// An S3 event type. -pub enum EventType { - ObjectCreated, - ObjectRemoved, - Other, -} - /// A flattened AWS S3 record #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct FlatS3EventMessage { pub object_id: Uuid, pub event_time: DateTime, - pub event_type: EventType, + pub event_name: String, pub bucket: String, pub key: String, pub size: i32, pub e_tag: String, pub sequencer: Option, pub portal_run_id: String, - pub head: Option + pub storage_class: Option, + pub last_modified_date: Option> } impl FlatS3EventMessage { - /// Update the head. - pub fn with_head(mut self, head: Option) -> Self { - self.head = head; + /// Update the storage class. + pub fn with_storage_class(mut self, storage_class: Option) -> Self { + self.storage_class = storage_class; + self + } + + /// Update the last modified date. + pub fn with_last_modified_date(mut self, last_modified_date: Option>) -> Self { + self.last_modified_date = last_modified_date; self } } @@ -234,19 +270,6 @@ pub struct ObjectRecord { pub sequencer: Option, } -impl Record { - /// Parses the event name into an event type. - pub fn parse_event_type(&self) -> EventType { - if self.event_name.contains("ObjectCreated") { - EventType::ObjectCreated - } else if self.event_name.contains("ObjectRemoved") { - EventType::ObjectRemoved - } else { - EventType::Other - } - } -} - impl TryFrom for FlatS3EventMessages { type Error = Error; @@ -256,12 +279,10 @@ impl TryFrom for FlatS3EventMessages { .records .into_iter() .map(|record| { - let event_type = record.parse_event_type(); - let Record { event_time, + event_name, s3, - .. } = record; let S3Record { bucket, object } = s3; @@ -284,15 +305,16 @@ impl TryFrom for FlatS3EventMessages { Ok(FlatS3EventMessage { object_id, event_time, - event_type, + event_name, bucket, key, size, e_tag, sequencer, portal_run_id, - // Head is optionally fetched later. - head: None, + // Head field are optionally fetched later. + storage_class: None, + last_modified_date: None }) }) .collect::>>()?, diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/s3.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/s3.rs index d1ca553bc..4466de723 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/s3.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/s3.rs @@ -3,7 +3,7 @@ use crate::database::CloudObject::S3 as S3CloudObject; use crate::database::Object; use crate::error::Error::{ConfigError, S3Error}; use crate::error::Result; -use crate::events::aws::{BucketRecord, ObjectRecord, S3EventMessage, S3Record}; +use crate::events::aws::{BucketRecord, FlatS3EventMessage, FlatS3EventMessages, ObjectRecord, S3EventMessage, S3Record}; use crate::file::File; use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput}; use aws_sdk_s3::Client; @@ -68,4 +68,19 @@ impl S3 { None } } + + pub async fn update_events(&self, events: FlatS3EventMessages) -> Result { + join_all(events.into_inner().into_iter().map(|event| async move { + let HeadObjectOutput { + storage_class, + last_modified, + .. + } = self.head(&event.key, &event.bucket).await?; + + let event = event.with_storage_class(storage_class); + let event = event.with_last_modified_date(Self::convert_datetime(last_modified)); + + Ok(event) + })).await.into_iter().collect() + } } diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/sqs.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/sqs.rs index d454ff6f7..e10b9b4db 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/sqs.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/sqs.rs @@ -45,12 +45,12 @@ impl SQS { .await .map_err(|err| SQSReceiveError(err.into_service_error().to_string()))?; - let event_messages: FlatS3EventMessages = join_all( + let event_messages: FlatS3EventMessages = rcv_message_output .messages .unwrap_or_default() .into_iter() - .map(|message| async move { + .map(|message| { trace!(message = ?message, "got the message"); if let Some(body) = message.body() { @@ -58,10 +58,7 @@ impl SQS { } else { Err(SQSReceiveError("No body in SQS message".to_string())) } - }), - ) - .await - .into_iter() + }) .collect::>>()? .into();