Skip to content

Commit

Permalink
refactor(filemanager): add deserialization tests, simplify from type …
Browse files Browse the repository at this point in the history
…implementations
  • Loading branch information
mmalenic committed Feb 26, 2024
1 parent be6e8a7 commit 53d6fba
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use uuid::Uuid;

use crate::database::{Client, Ingest};
use crate::error::Result;
use crate::events::aws::EventType;
use crate::events::aws::message::EventType;
use crate::events::aws::{Events, TransposedS3EventMessages};
use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass};
use crate::events::EventSourceType;
Expand Down Expand Up @@ -256,11 +256,11 @@ pub(crate) mod tests {
use crate::database::aws::ingester::Ingester;
use crate::database::aws::migration::tests::MIGRATOR;
use crate::database::{Client, Ingest};
use crate::events::aws::message::EventType::{Created, Deleted};
use crate::events::aws::tests::{
expected_events_simple, expected_flat_events_simple, EXPECTED_E_TAG,
EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_VERSION_ID,
};
use crate::events::aws::EventType::{Created, Deleted};
use crate::events::aws::{Events, FlatS3EventMessage, FlatS3EventMessages, StorageClass};
use crate::events::EventSourceType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ pub(crate) mod tests {
use crate::database::aws::ingester::tests::{test_events, test_ingester};
use crate::database::aws::ingester::Ingester;
use crate::database::aws::migration::tests::MIGRATOR;
use crate::events::aws::message::EventType;
use crate::events::aws::tests::{
EXPECTED_NEW_SEQUENCER_ONE, EXPECTED_SEQUENCER_CREATED_ONE, EXPECTED_SEQUENCER_CREATED_TWO,
EXPECTED_SEQUENCER_CREATED_ZERO, EXPECTED_VERSION_ID,
};
use crate::events::aws::EventType;
use crate::events::aws::StorageClass;
use crate::events::aws::{Events, FlatS3EventMessage};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
use crate::error::Error;
use crate::error::Error::DeserializeError;
use crate::error::Result;
use crate::events::aws::EventType::{Created, Deleted, Other};
use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages};
use chrono::{DateTime, ParseError, Utc};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use uuid::Uuid;

#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Clone, Hash, sqlx::Type)]
#[sqlx(type_name = "event_type")]
pub enum EventType {
#[default]
Created,
Deleted,
Other,
}

impl PgHasArrayType for EventType {
fn array_type_info() -> PgTypeInfo {
PgTypeInfo::with_name("_event_type")
}
}

impl From<&str> for EventType {
fn from(value: &str) -> Self {
if value.contains("Object Created") || value.contains("ObjectCreated") {
Self::Created
} else if value.contains("Object Deleted") || value.contains("ObjectRemoved") {
Self::Deleted
} else {
Self::Other
}
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum EventMessage {
EventBridge(Record),
SQS(Message),
}

impl TryFrom<EventMessage> for FlatS3EventMessages {
type Error = Error;

fn try_from(message: EventMessage) -> Result<Self> {
impl From<EventMessage> for FlatS3EventMessages {
fn from(message: EventMessage) -> Self {
match message {
EventMessage::EventBridge(record) => record.try_into(),
EventMessage::SQS(message) => message.try_into(),
EventMessage::EventBridge(record) => record.into(),
EventMessage::SQS(message) => message.into(),
}
}
}
Expand All @@ -43,7 +65,7 @@ pub struct Message {
#[serde(rename_all = "kebab-case")]
pub struct Record {
#[serde(alias = "eventTime")]
pub time: String,
pub time: DateTime<Utc>,
#[serde(alias = "eventName")]
pub detail_type: String,
#[serde(alias = "s3")]
Expand Down Expand Up @@ -78,10 +100,8 @@ pub struct Object {
pub sequencer: Option<String>,
}

impl TryFrom<Record> for FlatS3EventMessages {
type Error = Error;

fn try_from(record: Record) -> Result<Self> {
impl From<Record> for FlatS3EventMessages {
fn from(record: Record) -> Self {
let Record {
time,
detail_type,
Expand All @@ -100,23 +120,9 @@ impl TryFrom<Record> for FlatS3EventMessages {
sequencer,
} = object;

let event_time: DateTime<Utc> = time
.parse()
.map_err(|err: ParseError| DeserializeError(err.to_string()))?;

let event_type = if detail_type.contains("Object Created")
|| detail_type.contains("ObjectCreated")
{
Created
} else if detail_type.contains("Object Removed") || detail_type.contains("ObjectRemoved") {
Deleted
} else {
Other
};

Ok(FlatS3EventMessages(vec![FlatS3EventMessage {
FlatS3EventMessages(vec![FlatS3EventMessage {
s3_object_id: Uuid::new_v4(),
event_time: Some(event_time),
event_time: Some(time),
bucket,
key,
size,
Expand All @@ -126,23 +132,70 @@ impl TryFrom<Record> for FlatS3EventMessages {
// Head fields are fetched later.
storage_class: None,
last_modified_date: None,
event_type,
event_type: detail_type.as_str().into(),
number_reordered: 0,
number_duplicate_events: 0,
}]))
}])
}
}

impl TryFrom<Message> for FlatS3EventMessages {
type Error = Error;

fn try_from(message: Message) -> Result<Self> {
Ok(FlatS3EventMessages(message.records.into_iter().try_fold(
vec![],
|mut flattened, record| {
flattened.extend(FlatS3EventMessages::try_from(record)?.into_inner());
Ok::<_, Self::Error>(flattened)
},
)?))
impl From<Message> for FlatS3EventMessages {
fn from(message: Message) -> Self {
FlatS3EventMessages(
message
.records
.into_iter()
.fold(vec![], |mut flattened, record| {
flattened.extend(FlatS3EventMessages::from(record).into_inner());
flattened
}),
)
}
}

#[cfg(test)]
mod tests {
use crate::events::aws::tests::{
assert_flat_s3_event, expected_event_bridge_record, expected_sqs_record,
EXPECTED_SEQUENCER_DELETED_ONE, EXPECTED_VERSION_ID,
};
use crate::events::aws::EventType::Deleted;
use crate::events::aws::FlatS3EventMessages;
use serde_json::json;

#[test]
fn deserialize_sqs_message() {
let record = expected_sqs_record();
let message = json!({
"Records": [record]
})
.to_string();

let result: FlatS3EventMessages = serde_json::from_str(&message).unwrap();
let first_message = result.into_inner().first().unwrap().clone();

assert_flat_s3_event(
first_message,
&Deleted,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[test]
fn deserialize_event_bridge_message() {
let record = expected_event_bridge_record().to_string();

let result: FlatS3EventMessages = serde_json::from_str(&record).unwrap();
let first_message = result.into_inner().first().unwrap().clone();

assert_flat_s3_event(
first_message,
&Deleted,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
Some(EXPECTED_VERSION_ID.to_string()),
);
}
}
Loading

0 comments on commit 53d6fba

Please sign in to comment.