Skip to content

Commit

Permalink
feat(events): add transpose function for s3 events
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Sep 13, 2023
1 parent 2926904 commit 91168ae
Showing 1 changed file with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,78 @@ use uuid::Uuid;
pub mod s3;
pub mod sqs;

/// AWS S3 events with fields transposed
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct TransposedS3EventMessages {
pub object_ids: Vec<Uuid>,
pub event_times: Vec<DateTime<Utc>>,
pub event_types: Vec<EventType>,
pub buckets: Vec<String>,
pub keys: Vec<String>,
pub sizes: Vec<i32>,
pub e_tags: Vec<String>,
pub sequencers: Vec<Option<String>>,
pub portal_run_ids: Vec<String>,
pub heads: Vec<Option<HeadObjectOutput>>
}

impl TransposedS3EventMessages {
/// Create a new transposed S3 event messages vector with the given capacity.
pub fn with_capacity(capacity: usize) -> Self {
Self {
object_ids: Vec::with_capacity(capacity),
event_times: Vec::with_capacity(capacity),
event_types: Vec::with_capacity(capacity),
buckets: Vec::with_capacity(capacity),
keys: Vec::with_capacity(capacity),
sizes: Vec::with_capacity(capacity),
e_tags: Vec::with_capacity(capacity),
sequencers: Vec::with_capacity(capacity),
portal_run_ids: Vec::with_capacity(capacity),
heads: Vec::with_capacity(capacity)
}
}

/// Push an S3 event message.
pub fn push(&mut self, message: FlatS3EventMessage) {
let FlatS3EventMessage {
object_id,
event_time,
event_type,
bucket,
key,
size,
e_tag,
sequencer,
portal_run_id,
head
} = message;

self.object_ids.push(object_id);
self.event_times.push(event_time);
self.event_types.push(event_type);
self.buckets.push(bucket);
self.keys.push(key);
self.sizes.push(size);
self.e_tags.push(e_tag);
self.sequencers.push(sequencer);
self.portal_run_ids.push(portal_run_id);
self.heads.push(head);
}
}

impl From<FlatS3EventMessages> for TransposedS3EventMessages {
fn from(messages: FlatS3EventMessages) -> Self {
let messages = messages.into_inner();
let capacity = messages.len();

messages.into_inner().into_iter().fold(TransposedS3EventMessages::with_capacity(capacity), |mut acc, message| {
acc.push(message);
acc
})
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(try_from = "S3EventMessage")]
/// Flattened AWS S3 events
Expand Down

0 comments on commit 91168ae

Please sign in to comment.