Skip to content

Commit

Permalink
Merge pull request #161 from umccr/fix/filemanager-uuids
Browse files Browse the repository at this point in the history
fix: filemanager uuids
  • Loading branch information
mmalenic authored Mar 20, 2024
2 parents 24280e8 + 6f95512 commit 169f776
Show file tree
Hide file tree
Showing 10 changed files with 881 additions and 4,615 deletions.
1,341 changes: 692 additions & 649 deletions lib/workload/stateless/filemanager/Cargo.lock

Large diffs are not rendered by default.

3,841 changes: 0 additions & 3,841 deletions lib/workload/stateless/filemanager/filemanager/Cargo.lock

This file was deleted.

2 changes: 1 addition & 1 deletion lib/workload/stateless/filemanager/filemanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ chrono = { version = "0.4", features = ["serde"] }
dotenvy = "0.15"
thiserror = "1.0"
async-trait = "0.1"
uuid = { version = "1.4", features = ["v4"] }
uuid = { version = "1.7", features = ["v7"] }
mockall = "0.12"
mockall_double = "0.3"
lambda_runtime = "0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::events::aws::message::EventType;
use crate::events::aws::{Events, TransposedS3EventMessages};
use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass};
use crate::events::EventSourceType;
use crate::uuid::UuidGenerator;

/// An ingester for S3 events.
#[derive(Debug)]
Expand Down Expand Up @@ -134,7 +135,7 @@ impl Ingester {
.await?;

let object_created = Self::reprocess_updated(object_created, updated);
let object_ids = vec![Uuid::new_v4(); object_created.s3_object_ids.len()];
let object_ids = UuidGenerator::generate_n(object_created.s3_object_ids.len());

let mut inserted = query_file_as!(
Insert,
Expand All @@ -159,7 +160,10 @@ impl Ingester {

// Insert only the non duplicate events.
if !object_ids.is_empty() {
debug!(count = object_ids.len(), "inserting objects");
debug!(
object_ids = ?object_ids,
"inserting into object table created events"
);

query_file!(
"../database/queries/ingester/insert_objects.sql",
Expand All @@ -186,7 +190,7 @@ impl Ingester {
.await?;

let object_deleted = Self::reprocess_updated(object_deleted, updated);
let object_ids = vec![Uuid::new_v4(); object_deleted.s3_object_ids.len()];
let object_ids = UuidGenerator::generate_n(object_deleted.s3_object_ids.len());

let mut inserted = query_file_as!(
Insert,
Expand All @@ -213,7 +217,10 @@ impl Ingester {

// Insert only the non duplicate events.
if !object_ids.is_empty() {
debug!(count = object_ids.len(), "inserting objects");
debug!(
object_ids = ?object_ids,
"inserting into object table from deleted events"
);

query_file!(
"../database/queries/ingester/insert_objects.sql",
Expand Down Expand Up @@ -330,6 +337,72 @@ pub(crate) mod tests {
assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_created_multiple_object_ids(pool: PgPool) {
let ingester = test_ingester(pool);
let mut events_one = test_events();
events_one.object_deleted = Default::default();

let events_two = replace_sequencers(
test_events(),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
);
// Merge events into same ingestion.
let flat_events = FlatS3EventMessages::from(events_two.object_created);
flat_events
.into_inner()
.into_iter()
.for_each(|event| events_one.object_created.push(event));

ingester
.ingest(EventSourceType::S3(events_one))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_missing_deleted(
&s3_object_results[0],
&s3_object_results[1],
EXPECTED_VERSION_ID,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_deleted_multiple_object_ids(pool: PgPool) {
let ingester = test_ingester(pool);
let mut events_one = test_events();
events_one.object_created = Default::default();

let events_two = replace_sequencers(
test_events(),
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
);
// Merge events into same ingestion.
let flat_events = FlatS3EventMessages::from(events_two.object_deleted);
flat_events
.into_inner()
.into_iter()
.for_each(|event| events_one.object_deleted.push(event));

ingester
.ingest(EventSourceType::S3(events_one))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_missing_created(
&s3_object_results[0],
&s3_object_results[1],
EXPECTED_VERSION_ID,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_reordered_duplicates(pool: PgPool) {
let ingester = test_ingester(pool);
Expand Down Expand Up @@ -646,15 +719,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_created(&s3_object_results[0]);
assert_with(
assert_missing_deleted(
&s3_object_results[0],
&s3_object_results[1],
Some(0),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
EXPECTED_VERSION_ID.to_string(),
Some(Default::default()),
None,
EXPECTED_VERSION_ID,
);
}

Expand All @@ -681,15 +749,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_created(&s3_object_results[1]);
assert_with(
assert_missing_deleted(
&s3_object_results[1],
&s3_object_results[0],
Some(0),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
EXPECTED_VERSION_ID.to_string(),
Some(Default::default()),
None,
EXPECTED_VERSION_ID,
);
}

Expand All @@ -715,23 +778,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
assert_missing_created(
&s3_object_results[0],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
EXPECTED_VERSION_ID.to_string(),
None,
Some(Default::default()),
);
assert_with(
&s3_object_results[1],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
EXPECTED_VERSION_ID.to_string(),
None,
Some(Default::default()),
EXPECTED_VERSION_ID,
);
}

Expand All @@ -758,23 +808,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
assert_missing_created(
&s3_object_results[1],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
EXPECTED_VERSION_ID.to_string(),
None,
Some(Default::default()),
);
assert_with(
&s3_object_results[0],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
EXPECTED_VERSION_ID.to_string(),
None,
Some(Default::default()),
EXPECTED_VERSION_ID,
);
}

Expand All @@ -800,23 +837,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
&s3_object_results[1],
Some(0),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
FlatS3EventMessage::default_version_id(),
Some(Default::default()),
None,
);
assert_with(
assert_missing_deleted(
&s3_object_results[0],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
None,
FlatS3EventMessage::default_version_id(),
Some(Default::default()),
None,
&s3_object_results[1],
&FlatS3EventMessage::default_version_id(),
);
}

Expand All @@ -843,23 +867,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
assert_missing_deleted(
&s3_object_results[1],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
None,
FlatS3EventMessage::default_version_id(),
Some(Default::default()),
None,
);
assert_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
FlatS3EventMessage::default_version_id(),
Some(Default::default()),
None,
&FlatS3EventMessage::default_version_id(),
);
}

Expand All @@ -885,23 +896,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
assert_missing_created(
&s3_object_results[0],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
FlatS3EventMessage::default_version_id(),
None,
Some(Default::default()),
);
assert_with(
&s3_object_results[1],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
FlatS3EventMessage::default_version_id(),
None,
Some(Default::default()),
&FlatS3EventMessage::default_version_id(),
);
}

Expand All @@ -928,23 +926,10 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 2);
assert_eq!(s3_object_results.len(), 2);
assert_with(
assert_missing_created(
&s3_object_results[1],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
FlatS3EventMessage::default_version_id(),
None,
Some(Default::default()),
);
assert_with(
&s3_object_results[0],
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
FlatS3EventMessage::default_version_id(),
None,
Some(Default::default()),
&FlatS3EventMessage::default_version_id(),
);
}

Expand Down Expand Up @@ -1445,6 +1430,48 @@ pub(crate) mod tests {
);
}

fn assert_missing_deleted(created: &PgRow, deleted: &PgRow, version_id: &str) {
assert_with(
created,
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
None,
version_id.to_string(),
Some(Default::default()),
None,
);
assert_with(
deleted,
Some(0),
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
version_id.to_string(),
Some(Default::default()),
None,
);
}

fn assert_missing_created(created: &PgRow, deleted: &PgRow, version_id: &str) {
assert_with(
created,
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
version_id.to_string(),
None,
Some(Default::default()),
);
assert_with(
deleted,
None,
None,
Some(EXPECTED_SEQUENCER_DELETED_TWO.to_string()),
version_id.to_string(),
None,
Some(Default::default()),
);
}

fn remove_version_ids(mut events: Events) -> Events {
events
.object_deleted
Expand Down
Loading

0 comments on commit 169f776

Please sign in to comment.