Skip to content

Commit

Permalink
refactor(filemanager): match AWS behaviour when ingesting versioned d…
Browse files Browse the repository at this point in the history
…elete markers
  • Loading branch information
mmalenic committed Sep 22, 2024
1 parent c387d2e commit 46109e5
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub async fn generate_openapi(out_dir: &Path) -> Result<()> {
let model_ident: Ident = parse_quote! { Model };
for path in read_dir(out_dir)? {
let path = path?.path();

if path.extension() != Some("rs".as_ref()) {
continue;
}

let content = read_to_string(&path)?;

let mut tokens = parse_file(&content).map_err(|err| OpenAPIGeneration(err.to_string()))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,41 +105,17 @@ pub(crate) mod tests {
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_created_delete_marker(pool: PgPool) {
let events: FlatS3EventMessages = FlatS3EventMessages::from(test_events_delete_marker())
.0
.into_iter()
.filter(|event| event.event_type == Created)
.collect();
async fn ingest_object_delete_marker(pool: PgPool) {
let events: FlatS3EventMessages = FlatS3EventMessages::from(test_events_delete_marker());

let ingester = test_ingester(pool);
ingester.ingest(S3(events.into())).await.unwrap();

let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);

let message = expected_message(Some(0), EXPECTED_VERSION_ID.to_string(), true, Created);
assert_row(
&s3_object_results[0],
message,
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
Some(Default::default()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_removed_delete_marker(pool: PgPool) {
let events = test_events_delete_marker();

let ingester = test_ingester(pool);
ingester.ingest(S3(events)).await.unwrap();

let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 2);

let message = expected_message(Some(0), EXPECTED_VERSION_ID.to_string(), true, Created);
let message = expected_message(Some(0), EXPECTED_VERSION_ID.to_string(), true, Deleted);
assert_row(
&s3_object_results[0],
message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,45 +218,32 @@ pub(crate) mod tests {

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_created_delete_marker(pool: PgPool) {
let mut events = test_events_delete_marker();
events.object_deleted = Default::default();
let events = test_events_delete_marker();

let ingester = test_ingester(pool);
ingester.ingest(S3Paired(events)).await.unwrap();

let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);
assert_eq!(s3_object_results.len(), 2);

let message = expected_message(Some(0), EXPECTED_VERSION_ID.to_string(), true);
assert_row(
&s3_object_results[0],
message,
None,
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
None,
Some(Default::default()),
None,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_removed_delete_marker(pool: PgPool) {
let events = test_events_delete_marker();

let ingester = test_ingester(pool);
ingester.ingest(S3Paired(events)).await.unwrap();

let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);

let message = expected_message(Some(0), EXPECTED_VERSION_ID.to_string(), true);
let message = expected_message(None, EXPECTED_VERSION_ID.to_string(), false);
assert_row(
&s3_object_results[0],
&s3_object_results[1],
message,
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
None,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
Some(Default::default()),
None,
Some(Default::default()),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl From<EventTypeData> for EventTypeDeleteMarker {
.is_some_and(|d| d.contains("Delete Marker Created")))
|| e.contains("ObjectRemoved:DeleteMarkerCreated") =>
{
Self::new(EventType::Created, true)
Self::new(EventType::Deleted, true)
}
// Regular deleted event.
e if e.contains("Object Deleted") || e.contains("ObjectRemoved") => {
Expand Down Expand Up @@ -347,7 +347,7 @@ mod tests {

assert_flat_s3_event(
first_message,
&Created,
&Deleted,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
EXPECTED_VERSION_ID.to_string(),
Expand All @@ -369,7 +369,7 @@ mod tests {

assert_flat_s3_event(
first_message,
&Created,
&Deleted,
Some(EXPECTED_SEQUENCER_DELETED_ONE.to_string()),
None,
EXPECTED_VERSION_ID.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
/// select distinct on (bucket, key, version_id) * from s3_object
/// order by bucket, key, version_id, sequencer desc
/// ) as s3_object
/// where event_type = 'Created';
/// where event_type = 'Created' and is_delete_marker = false;
/// ```
///
/// This finds all distinct objects within a (bucket, key, version_id) grouping such that they
Expand All @@ -205,7 +205,8 @@ where
QuerySelect::query(&mut self.select)
.from_clear()
.from_subquery(subquery, Alias::new("s3_object"))
.and_where(s3_object::Column::EventType.eq("Created"));
.and_where(s3_object::Column::EventType.eq("Created"))
.and_where(s3_object::Column::IsDeleteMarker.eq(false));

self.trace_query("current_state");

Expand Down

0 comments on commit 46109e5

Please sign in to comment.