Skip to content

Commit

Permalink
Merge pull request #616 from umccr/feat/filemanager-partitions
Browse files Browse the repository at this point in the history
fix(filemanager): resetting current state should include old verisons
  • Loading branch information
mmalenic authored Oct 25, 2024
2 parents d45b20b + 4a24bfb commit e4ebd26
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ alter table s3_object add column is_current_state boolean not null default false
with to_update as (
-- Get all records representing the current state.
select * from (
select distinct on (bucket, key, version_id) * from s3_object
order by bucket, key, version_id, sequencer desc
select distinct on (bucket, key) * from s3_object
order by bucket, key, sequencer desc
) as s3_object
where event_type = 'Created' and is_delete_marker = false
)
Expand All @@ -26,6 +26,6 @@ alter table s3_object alter column is_current_state set default true;
-- Create an indexes for now, although partitioning will be required later.
create index is_current_state_index on s3_object (is_current_state);
-- This helps the query which resets the current state when ingesting objects.
create index reset_current_state_index on s3_object (bucket, key, version_id, sequencer, is_current_state);
create index reset_current_state_index on s3_object (bucket, key, sequencer, is_current_state);

commit;
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ to_update as (
-- the right indexes.
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id and
-- Note that we need to fetch all versions of an object, because when a record is updated,
-- previous versions need to have their `is_current_state` set to false.
-- This is an optimization which prevents querying against all objects in the set.
(
-- Only need to update current objects
Expand Down
13 changes: 11 additions & 2 deletions lib/workload/stateless/stacks/filemanager/docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ historical data that represent previously deleted objects.
This value is computed when events are ingested, and automatically kept up to date. This is done at ingestion because
the performance impact of determining this is too great on every API call. This does incur a performance penalty for
ingestion. However, it should be minimal as only other current records need to be considered. This is because records
only need to transition from current to historical (and not the other way around). Records which are current represent
a smaller subset of all records, meaning that only a smaller part of the dataset needs to be queried when ingesting.
only need to transition from current to historical (and not the other way around).

For example, consider a `Created` event `"A"` for a given key. `"A"` starts with `is_current_state` set to true.
Then, another `Created` event `"B"` comes in for the same key, which represents overwriting that object. Now, `"B"`
has `is_current_state` set to true. The ingester needs to flip `is_current_state` to false on `"A"` as it's no longer current.
This also applies if a new version of an object is created.

Since records which are current represent a smaller subset of all records, only a smaller part of the database needs to be
queried to do this logic. This is currently performant enough to happen at ingestion, however if it becomes an issue,
it could be performed asynchronously in a different process.

#### Paired ingest mode

Ordering events on ingestion can be turned on by setting `PAIRED_INGEST_MODE=true` as an environment variable. This has
a performance cost on ingestion, but it removes the requirment to order events when querying the database.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,44 @@ pub(crate) mod tests {
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_objects_reset_current_state_versioned(pool: PgPool) {
let ingester = test_ingester(pool);

let mut events_one = test_events(Some(Created));
events_one.sequencers[0] = Some(EXPECTED_SEQUENCER_CREATED_TWO.to_string());
events_one.version_ids[0] = "2".to_string();
// Previous version of the object.
let mut events_two = test_events(Some(Created));
events_two.version_ids[0] = "1".to_string();

// Out of order.
ingester.ingest(S3(events_one)).await.unwrap();
ingester.ingest(S3(events_two)).await.unwrap();

let s3_object_results = fetch_results_ordered(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
"1".to_string(),
Some(Default::default()),
Created,
false,
);
assert_with(
&s3_object_results[1],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_TWO.to_string()),
"2".to_string(),
Some(Default::default()),
Created,
true,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_permutations_small_without_version_id(pool: PgPool) {
let event_permutations = vec![
Expand Down

0 comments on commit e4ebd26

Please sign in to comment.