From 53d6fbaad6f36f8d19ab9e45eedd3c22c16c8489 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 26 Feb 2024 14:13:59 +1100 Subject: [PATCH] refactor(filemanager): add deserialization tests, simplify from type implementations --- .../filemanager/src/database/aws/ingester.rs | 4 +- .../filemanager/src/database/mod.rs | 2 +- .../filemanager/src/events/aws/message.rs | 143 ++++++++++++------ .../filemanager/src/events/aws/mod.rs | 111 ++++++++------ 4 files changed, 165 insertions(+), 95 deletions(-) diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs index f2337ad95..2a1cabc46 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::database::{Client, Ingest}; use crate::error::Result; -use crate::events::aws::EventType; +use crate::events::aws::message::EventType; use crate::events::aws::{Events, TransposedS3EventMessages}; use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; @@ -256,11 +256,11 @@ pub(crate) mod tests { use crate::database::aws::ingester::Ingester; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; + use crate::events::aws::message::EventType::{Created, Deleted}; use crate::events::aws::tests::{ expected_events_simple, expected_flat_events_simple, EXPECTED_E_TAG, EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_VERSION_ID, }; - use crate::events::aws::EventType::{Created, Deleted}; use crate::events::aws::{Events, FlatS3EventMessage, FlatS3EventMessages, StorageClass}; use crate::events::EventSourceType; diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs index 529fa75cb..12c21fc8e 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs @@ -58,11 +58,11 @@ pub(crate) mod tests { use crate::database::aws::ingester::tests::{test_events, test_ingester}; use crate::database::aws::ingester::Ingester; use crate::database::aws::migration::tests::MIGRATOR; + use crate::events::aws::message::EventType; use crate::events::aws::tests::{ EXPECTED_NEW_SEQUENCER_ONE, EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_SEQUENCER_CREATED_TWO, EXPECTED_SEQUENCER_CREATED_ZERO, EXPECTED_VERSION_ID, }; - use crate::events::aws::EventType; use crate::events::aws::StorageClass; use crate::events::aws::{Events, FlatS3EventMessage}; diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs index 916492354..ef521e5b7 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/message.rs @@ -1,12 +1,36 @@ -use crate::error::Error; -use crate::error::Error::DeserializeError; -use crate::error::Result; -use crate::events::aws::EventType::{Created, Deleted, Other}; use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages}; -use chrono::{DateTime, ParseError, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use uuid::Uuid; +#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Clone, Hash, sqlx::Type)] +#[sqlx(type_name = "event_type")] +pub enum EventType { + #[default] + Created, + Deleted, + Other, +} + +impl PgHasArrayType for EventType { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("_event_type") + } +} + +impl From<&str> for EventType { + fn from(value: &str) -> Self { + if value.contains("Object Created") || value.contains("ObjectCreated") { + Self::Created + } else if value.contains("Object Deleted") || value.contains("ObjectRemoved") { + Self::Deleted + } else { + Self::Other + } + } +} + #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum EventMessage { @@ -14,13 +38,11 @@ pub enum EventMessage { SQS(Message), } -impl TryFrom for FlatS3EventMessages { - type Error = Error; - - fn try_from(message: EventMessage) -> Result { +impl From for FlatS3EventMessages { + fn from(message: EventMessage) -> Self { match message { - EventMessage::EventBridge(record) => record.try_into(), - EventMessage::SQS(message) => message.try_into(), + EventMessage::EventBridge(record) => record.into(), + EventMessage::SQS(message) => message.into(), } } } @@ -43,7 +65,7 @@ pub struct Message { #[serde(rename_all = "kebab-case")] pub struct Record { #[serde(alias = "eventTime")] - pub time: String, + pub time: DateTime, #[serde(alias = "eventName")] pub detail_type: String, #[serde(alias = "s3")] @@ -78,10 +100,8 @@ pub struct Object { pub sequencer: Option, } -impl TryFrom for FlatS3EventMessages { - type Error = Error; - - fn try_from(record: Record) -> Result { +impl From for FlatS3EventMessages { + fn from(record: Record) -> Self { let Record { time, detail_type, @@ -100,23 +120,9 @@ impl TryFrom for FlatS3EventMessages { sequencer, } = object; - let event_time: DateTime = time - .parse() - .map_err(|err: ParseError| DeserializeError(err.to_string()))?; - - let event_type = if detail_type.contains("Object Created") - || detail_type.contains("ObjectCreated") - { - Created - } else if detail_type.contains("Object Removed") || detail_type.contains("ObjectRemoved") { - Deleted - } else { - Other - }; - - Ok(FlatS3EventMessages(vec![FlatS3EventMessage { + FlatS3EventMessages(vec![FlatS3EventMessage { s3_object_id: Uuid::new_v4(), - event_time: Some(event_time), + event_time: Some(time), bucket, key, size, @@ -126,23 +132,70 @@ impl TryFrom for FlatS3EventMessages { // Head fields are fetched later. storage_class: None, last_modified_date: None, - event_type, + event_type: detail_type.as_str().into(), number_reordered: 0, number_duplicate_events: 0, - }])) + }]) } } -impl TryFrom for FlatS3EventMessages { - type Error = Error; - - fn try_from(message: Message) -> Result { - Ok(FlatS3EventMessages(message.records.into_iter().try_fold( - vec![], - |mut flattened, record| { - flattened.extend(FlatS3EventMessages::try_from(record)?.into_inner()); - Ok::<_, Self::Error>(flattened) - }, - )?)) +impl From for FlatS3EventMessages { + fn from(message: Message) -> Self { + FlatS3EventMessages( + message + .records + .into_iter() + .fold(vec![], |mut flattened, record| { + flattened.extend(FlatS3EventMessages::from(record).into_inner()); + flattened + }), + ) + } +} + +#[cfg(test)] +mod tests { + use crate::events::aws::tests::{ + assert_flat_s3_event, expected_event_bridge_record, expected_sqs_record, + EXPECTED_SEQUENCER_DELETED_ONE, EXPECTED_VERSION_ID, + }; + use crate::events::aws::EventType::Deleted; + use crate::events::aws::FlatS3EventMessages; + use serde_json::json; + + #[test] + fn deserialize_sqs_message() { + let record = expected_sqs_record(); + let message = json!({ + "Records": [record] + }) + .to_string(); + + let result: FlatS3EventMessages = serde_json::from_str(&message).unwrap(); + let first_message = result.into_inner().first().unwrap().clone(); + + assert_flat_s3_event( + first_message, + &Deleted, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + Some(EXPECTED_VERSION_ID.to_string()), + ); + } + + #[test] + fn deserialize_event_bridge_message() { + let record = expected_event_bridge_record().to_string(); + + let result: FlatS3EventMessages = serde_json::from_str(&record).unwrap(); + let first_message = result.into_inner().first().unwrap().clone(); + + assert_flat_s3_event( + first_message, + &Deleted, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), + None, + Some(EXPECTED_VERSION_ID.to_string()), + ); } } diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs index 7020adbdb..747412a44 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs @@ -9,6 +9,7 @@ use serde::Deserialize; use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use uuid::Uuid; +use crate::events::aws::message::EventType; use crate::events::aws::EventType::{Created, Deleted, Other}; pub mod collecter; @@ -231,7 +232,7 @@ impl From for Events { /// Flattened AWS S3 events #[derive(Debug, Deserialize, Eq, PartialEq, Default)] -#[serde(try_from = "EventMessage")] +#[serde(from = "EventMessage")] pub struct FlatS3EventMessages(pub Vec); impl FlatS3EventMessages { @@ -355,21 +356,6 @@ impl FlatS3EventMessages { } } -#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Clone, Hash, sqlx::Type)] -#[sqlx(type_name = "event_type")] -pub enum EventType { - #[default] - Created, - Deleted, - Other, -} - -impl PgHasArrayType for EventType { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_event_type") - } -} - /// A flattened AWS S3 record #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Default)] pub struct FlatS3EventMessage { @@ -525,7 +511,7 @@ pub(crate) mod tests { assert_flat_s3_event( first, &EventType::Deleted, - EXPECTED_SEQUENCER_DELETED_ONE, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, Some(EXPECTED_VERSION_ID.to_string()), ); @@ -534,7 +520,7 @@ pub(crate) mod tests { assert_flat_s3_event( second, &EventType::Created, - EXPECTED_SEQUENCER_CREATED_ONE, + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), Some(EXPECTED_VERSION_ID.to_string()), ); @@ -543,7 +529,7 @@ pub(crate) mod tests { assert_flat_s3_event( third, &EventType::Created, - EXPECTED_SEQUENCER_CREATED_ONE, + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), Some(EXPECTED_VERSION_ID.to_string()), ); @@ -558,7 +544,7 @@ pub(crate) mod tests { assert_flat_s3_event( first, &EventType::Created, - EXPECTED_SEQUENCER_CREATED_ONE, + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), Some(EXPECTED_VERSION_ID.to_string()), ); @@ -567,7 +553,7 @@ pub(crate) mod tests { assert_flat_s3_event( second, &EventType::Deleted, - EXPECTED_SEQUENCER_DELETED_ONE, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, Some(EXPECTED_VERSION_ID.to_string()), ); @@ -594,7 +580,7 @@ pub(crate) mod tests { assert_flat_s3_event( first, &EventType::Created, - EXPECTED_SEQUENCER_CREATED_ONE, + Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()), Some(0), Some(EXPECTED_VERSION_ID.to_string()), ); @@ -603,7 +589,7 @@ pub(crate) mod tests { assert_flat_s3_event( second, &EventType::Deleted, - EXPECTED_SEQUENCER_DELETED_ONE, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, Some(EXPECTED_VERSION_ID.to_string()), ); @@ -612,16 +598,16 @@ pub(crate) mod tests { assert_flat_s3_event( third, &EventType::Deleted, - EXPECTED_SEQUENCER_DELETED_ONE, + Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()), None, Some("version_id".to_string()), ); } - fn assert_flat_s3_event( + pub(crate) fn assert_flat_s3_event( event: FlatS3EventMessage, event_type: &EventType, - sequencer: &str, + sequencer: Option, size: Option, version_id: Option, ) { @@ -632,7 +618,7 @@ pub(crate) mod tests { assert_eq!(event.version_id, version_id); assert_eq!(event.size, size); assert_eq!(event.e_tag, Some(EXPECTED_E_TAG.to_string())); // pragma: allowlist secret - assert_eq!(event.sequencer, Some(sequencer.to_string())); + assert_eq!(event.sequencer, sequencer); assert_eq!(event.storage_class, None); assert_eq!(event.last_modified_date, None); } @@ -699,7 +685,7 @@ pub(crate) mod tests { fn expected_flat_events(records: String) -> FlatS3EventMessages { let events: Message = serde_json::from_str(&records).unwrap(); - events.try_into().unwrap() + events.into() } fn expected_events(records: String) -> Events { @@ -731,12 +717,47 @@ pub(crate) mod tests { records.to_string() } - pub(crate) fn expected_sqs_event_message() -> Value { - let object = json!({ - "eventTime": "1970-01-01T00:00:00.000Z", + /// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html + pub(crate) fn expected_event_bridge_record() -> Value { + json!({ + "version": "0", + "id": "2ee9cc15-d022-99ea-1fb8-1b1bac4850f9", + "detail-type": "Object Deleted", + "source": "aws.s3", + "account": "111122223333", + "time": "1970-01-01T00:00:00.000Z", + "region": "ca-central-1", + "resources": [ + "arn:aws:s3:::bucket" + ], + "detail": { + "version": "0", + "bucket": { + "name": "bucket" + }, + "object": { + "key": "key", + "etag": EXPECTED_E_TAG, + "version-id": EXPECTED_VERSION_ID, + "sequencer": EXPECTED_SEQUENCER_DELETED_ONE, + }, + "request-id": "C3D13FE58DE4C810", // pragma: allowlist secret + "requester": "123456789012", + "source-ip-address": "127.0.0.1", + "reason": "DeleteObject", + "deletion-type": "Delete Marker Created" + } + }) + } + + /// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html + pub(crate) fn expected_sqs_record() -> Value { + json!({ "eventVersion": "2.2", "eventSource": "aws:s3", "awsRegion": "us-west-2", + "eventTime": "1970-01-01T00:00:00.000Z", + "eventName": "ObjectRemoved:Delete", "userIdentity": { "principalId": "123456789012" }, @@ -751,17 +772,17 @@ pub(crate) mod tests { "s3SchemaVersion": "1.0", "configurationId": "testConfigRule", "bucket": { - "name": "bucket", - "ownerIdentity": { - "principalId":"123456789012" - }, - "arn": "arn:aws:s3:::bucket" + "name": "bucket", + "ownerIdentity": { + "principalId":"123456789012" + }, + "arn": "arn:aws:s3:::bucket" }, "object": { - "key": "key", - "eTag": EXPECTED_E_TAG, - "versionId": EXPECTED_VERSION_ID, - "sequencer": EXPECTED_SEQUENCER_CREATED_ONE + "key": "key", + "eTag": EXPECTED_E_TAG, + "versionId": EXPECTED_VERSION_ID, + "sequencer": EXPECTED_SEQUENCER_DELETED_ONE, } }, "glacierEventData": { @@ -770,19 +791,15 @@ pub(crate) mod tests { "lifecycleRestoreStorageClass": "Standard" } } - }); - - let mut object_deleted = object.clone(); - object_deleted["eventName"] = json!("ObjectRemoved:Delete"); - - object_deleted + }) } pub(crate) fn expected_event_record_full() -> String { - let object = expected_sqs_event_message(); + let object = expected_sqs_record(); let mut object_created_one = object.clone(); object_created_one["eventName"] = json!("ObjectCreated:Put"); + object_created_one["s3"]["object"]["sequencer"] = json!(EXPECTED_SEQUENCER_CREATED_ONE); object_created_one["s3"]["object"]["size"] = json!(0); let object_created_one_duplicate = object_created_one.clone();