Skip to content

Commit

Permalink
feat(events): add group by function
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Sep 13, 2023
1 parent 91168ae commit 938d235
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::StreamExt;
use sqlx::{Executor, Postgres, QueryBuilder};
use uuid::Uuid;
use crate::database::DbClient;
use crate::events::aws::{EventType, FlatS3EventMessage, FlatS3EventMessages};
use crate::events::aws::{EventType, FlatS3EventMessage, FlatS3EventMessages, TransposedS3EventMessages};
use crate::events::aws::s3::S3;
use crate::error::Result;

Expand All @@ -31,22 +31,9 @@ impl Ingester {
}

pub async fn ingest_events(&self, events: FlatS3EventMessages) -> Result<()> {
let mut query_builder_object: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO object (object_id, bucket, key, size, hash, created_date, last_modified_date, deleted_date, portal_run_id) ",
);
let mut query_builder_s3_object: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO s3_object (object_id, storage_class) ",
);

let mut update_object: QueryBuilder<Postgres> = QueryBuilder::new(
r#"
update object
set deleted_date = data.new_value
from
(select unnest(?) as key, unnest(?) as new_value) as data
where "table".key = data.key;
"#,
);
let transposed_events = TransposedS3EventMessages::from(events);

transposed_events.event_times.into_iter().group

let mut events = join_all(events.into_inner().into_iter().map(|event| async move {
let head = self.s3.head(&event.key, &event.bucket).await?;
Expand Down
102 changes: 62 additions & 40 deletions lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashSet};
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
use aws_sdk_s3::types::StorageClass;
use uuid::Uuid;

pub mod s3;
Expand All @@ -16,14 +17,15 @@ pub mod sqs;
pub struct TransposedS3EventMessages {
pub object_ids: Vec<Uuid>,
pub event_times: Vec<DateTime<Utc>>,
pub event_types: Vec<EventType>,
pub event_names: Vec<String>,
pub buckets: Vec<String>,
pub keys: Vec<String>,
pub sizes: Vec<i32>,
pub e_tags: Vec<String>,
pub sequencers: Vec<Option<String>>,
pub portal_run_ids: Vec<String>,
pub heads: Vec<Option<HeadObjectOutput>>
pub storage_classes: Vec<Option<StorageClass>>,
pub last_modified_dates: Vec<Option<DateTime<Utc>>>
}

impl TransposedS3EventMessages {
Expand All @@ -32,14 +34,15 @@ impl TransposedS3EventMessages {
Self {
object_ids: Vec::with_capacity(capacity),
event_times: Vec::with_capacity(capacity),
event_types: Vec::with_capacity(capacity),
event_names: Vec::with_capacity(capacity),
buckets: Vec::with_capacity(capacity),
keys: Vec::with_capacity(capacity),
sizes: Vec::with_capacity(capacity),
e_tags: Vec::with_capacity(capacity),
sequencers: Vec::with_capacity(capacity),
portal_run_ids: Vec::with_capacity(capacity),
heads: Vec::with_capacity(capacity)
storage_classes: Vec::with_capacity(capacity),
last_modified_dates: Vec::with_capacity(capacity)
}
}

Expand All @@ -48,26 +51,28 @@ impl TransposedS3EventMessages {
let FlatS3EventMessage {
object_id,
event_time,
event_type,
event_name,
bucket,
key,
size,
e_tag,
sequencer,
portal_run_id,
head
storage_class,
last_modified_date,
} = message;

self.object_ids.push(object_id);
self.event_times.push(event_time);
self.event_types.push(event_type);
self.event_names.push(event_name);
self.buckets.push(bucket);
self.keys.push(key);
self.sizes.push(size);
self.e_tags.push(e_tag);
self.sequencers.push(sequencer);
self.portal_run_ids.push(portal_run_id);
self.heads.push(head);
self.storage_classes.push(storage_class);
self.last_modified_dates.push(last_modified_date);
}
}

Expand All @@ -83,7 +88,38 @@ impl From<FlatS3EventMessages> for TransposedS3EventMessages {
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
/// Group by event types.
pub struct GroupByEventType {
object_created: FlatS3EventMessages,
object_removed: FlatS3EventMessages,
other: FlatS3EventMessages
}

impl From<FlatS3EventMessages> for GroupByEventType {
fn from(messages: FlatS3EventMessages) -> Self {
let mut object_created = FlatS3EventMessages::default();
let mut object_removed = FlatS3EventMessages::default();
let mut other = FlatS3EventMessages::default();

messages.into_inner().into_iter().for_each(|message| {
if message.event_name.contains("ObjectCreated") {
object_created.0.push(message);
} else if message.event_name.contains("ObjectRemoved") {
object_removed.0.push(message);
} else {
other.0.push(message);
}
});

Self {
object_created,
object_removed,
other
}
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
#[serde(try_from = "S3EventMessage")]
/// Flattened AWS S3 events
pub struct FlatS3EventMessages(pub Vec<FlatS3EventMessage>);
Expand Down Expand Up @@ -166,32 +202,32 @@ impl PartialOrd for FlatS3EventMessage {
}
}

/// An S3 event type.
pub enum EventType {
ObjectCreated,
ObjectRemoved,
Other,
}

/// A flattened AWS S3 record
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct FlatS3EventMessage {
pub object_id: Uuid,
pub event_time: DateTime<Utc>,
pub event_type: EventType,
pub event_name: String,
pub bucket: String,
pub key: String,
pub size: i32,
pub e_tag: String,
pub sequencer: Option<String>,
pub portal_run_id: String,
pub head: Option<HeadObjectOutput>
pub storage_class: Option<StorageClass>,
pub last_modified_date: Option<DateTime<Utc>>
}

impl FlatS3EventMessage {
/// Update the head.
pub fn with_head(mut self, head: Option<HeadObjectOutput>) -> Self {
self.head = head;
/// Update the storage class.
pub fn with_storage_class(mut self, storage_class: Option<StorageClass>) -> Self {
self.storage_class = storage_class;
self
}

/// Update the last modified date.
pub fn with_last_modified_date(mut self, last_modified_date: Option<DateTime<Utc>>) -> Self {
self.last_modified_date = last_modified_date;
self
}
}
Expand Down Expand Up @@ -234,19 +270,6 @@ pub struct ObjectRecord {
pub sequencer: Option<String>,
}

impl Record {
/// Parses the event name into an event type.
pub fn parse_event_type(&self) -> EventType {
if self.event_name.contains("ObjectCreated") {
EventType::ObjectCreated
} else if self.event_name.contains("ObjectRemoved") {
EventType::ObjectRemoved
} else {
EventType::Other
}
}
}

impl TryFrom<S3EventMessage> for FlatS3EventMessages {
type Error = Error;

Expand All @@ -256,12 +279,10 @@ impl TryFrom<S3EventMessage> for FlatS3EventMessages {
.records
.into_iter()
.map(|record| {
let event_type = record.parse_event_type();

let Record {
event_time,
event_name,
s3,
..
} = record;

let S3Record { bucket, object } = s3;
Expand All @@ -284,15 +305,16 @@ impl TryFrom<S3EventMessage> for FlatS3EventMessages {
Ok(FlatS3EventMessage {
object_id,
event_time,
event_type,
event_name,
bucket,
key,
size,
e_tag,
sequencer,
portal_run_id,
// Head is optionally fetched later.
head: None,
// Head field are optionally fetched later.
storage_class: None,
last_modified_date: None
})
})
.collect::<Result<Vec<FlatS3EventMessage>>>()?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::database::CloudObject::S3 as S3CloudObject;
use crate::database::Object;
use crate::error::Error::{ConfigError, S3Error};
use crate::error::Result;
use crate::events::aws::{BucketRecord, ObjectRecord, S3EventMessage, S3Record};
use crate::events::aws::{BucketRecord, FlatS3EventMessage, FlatS3EventMessages, ObjectRecord, S3EventMessage, S3Record};
use crate::file::File;
use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput};
use aws_sdk_s3::Client;
Expand Down Expand Up @@ -68,4 +68,19 @@ impl S3 {
None
}
}

pub async fn update_events(&self, events: FlatS3EventMessages) -> Result<FlatS3EventMessage> {
join_all(events.into_inner().into_iter().map(|event| async move {
let HeadObjectOutput {
storage_class,
last_modified,
..
} = self.head(&event.key, &event.bucket).await?;

let event = event.with_storage_class(storage_class);
let event = event.with_last_modified_date(Self::convert_datetime(last_modified));

Ok(event)
})).await.into_iter().collect()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,20 @@ impl SQS {
.await
.map_err(|err| SQSReceiveError(err.into_service_error().to_string()))?;

let event_messages: FlatS3EventMessages = join_all(
let event_messages: FlatS3EventMessages =
rcv_message_output
.messages
.unwrap_or_default()
.into_iter()
.map(|message| async move {
.map(|message| {
trace!(message = ?message, "got the message");

if let Some(body) = message.body() {
serde_json::from_str(body).map_err(|err| DeserializeError(err.to_string()))
} else {
Err(SQSReceiveError("No body in SQS message".to_string()))
}
}),
)
.await
.into_iter()
})
.collect::<Result<Vec<FlatS3EventMessages>>>()?
.into();

Expand Down

0 comments on commit 938d235

Please sign in to comment.