Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(filemanager): resetting current state should include old verisons #616

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ alter table s3_object add column is_current_state boolean not null default false
with to_update as (
-- Get all records representing the current state.
select * from (
select distinct on (bucket, key, version_id) * from s3_object
order by bucket, key, version_id, sequencer desc
select distinct on (bucket, key) * from s3_object
order by bucket, key, sequencer desc
) as s3_object
where event_type = 'Created' and is_delete_marker = false
)
Expand All @@ -26,6 +26,6 @@ alter table s3_object alter column is_current_state set default true;
-- Create an indexes for now, although partitioning will be required later.
create index is_current_state_index on s3_object (is_current_state);
-- This helps the query which resets the current state when ingesting objects.
create index reset_current_state_index on s3_object (bucket, key, version_id, sequencer, is_current_state);
create index reset_current_state_index on s3_object (bucket, key, sequencer, is_current_state);

commit;
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ to_update as (
-- the right indexes.
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id and
-- Note that we need to fetch all versions of an object, because when a record is updated,
-- previous versions need to have their `is_current_state` set to false.
-- This is an optimization which prevents querying against all objects in the set.
(
-- Only need to update current objects
Expand Down
13 changes: 11 additions & 2 deletions lib/workload/stateless/stacks/filemanager/docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ historical data that represent previously deleted objects.
This value is computed when events are ingested, and automatically kept up to date. This is done at ingestion because
the performance impact of determining this is too great on every API call. This does incur a performance penalty for
ingestion. However, it should be minimal as only other current records need to be considered. This is because records
only need to transition from current to historical (and not the other way around). Records which are current represent
a smaller subset of all records, meaning that only a smaller part of the dataset needs to be queried when ingesting.
only need to transition from current to historical (and not the other way around).

For example, consider a `Created` event `"A"` for a given key. `"A"` starts with `is_current_state` set to true.
Then, another `Created` event `"B"` comes in for the same key, which represents overwriting that object. Now, `"B"`
has `is_current_state` set to true. The ingester needs to flip `is_current_state` to false on `"A"` as it's no longer current.
This also applies if a new version of an object is created.

Since records which are current represent a smaller subset of all records, only a smaller part of the database needs to be
queried to do this logic. This is currently performant enough to happen at ingestion, however if it becomes an issue,
it could be performed asynchronously in a different process.

#### Paired ingest mode

Ordering events on ingestion can be turned on by setting `PAIRED_INGEST_MODE=true` as an environment variable. This has
a performance cost on ingestion, but it removes the requirment to order events when querying the database.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,44 @@ pub(crate) mod tests {
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_objects_reset_current_state_versioned(pool: PgPool) {
let ingester = test_ingester(pool);

let mut events_one = test_events(Some(Created));
events_one.sequencers[0] = Some(EXPECTED_SEQUENCER_CREATED_TWO.to_string());
events_one.version_ids[0] = "2".to_string();
// Previous version of the object.
let mut events_two = test_events(Some(Created));
events_two.version_ids[0] = "1".to_string();

// Out of order.
ingester.ingest(S3(events_one)).await.unwrap();
ingester.ingest(S3(events_two)).await.unwrap();

let s3_object_results = fetch_results_ordered(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_ONE.to_string()),
"1".to_string(),
Some(Default::default()),
Created,
false,
);
assert_with(
&s3_object_results[1],
Some(0),
Some(EXPECTED_SEQUENCER_CREATED_TWO.to_string()),
"2".to_string(),
Some(Default::default()),
Created,
true,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_permutations_small_without_version_id(pool: PgPool) {
let event_permutations = vec![
Expand Down