Skip to content

Commit

Permalink
Merge pull request #615 from umccr/feat/filemanager-partitions
Browse files Browse the repository at this point in the history
feat: filemanager partitions
  • Loading branch information
mmalenic authored Oct 25, 2024
2 parents 6f2074f + 5f54470 commit d45b20b
Show file tree
Hide file tree
Showing 27 changed files with 832 additions and 352 deletions.
2 changes: 2 additions & 0 deletions lib/workload/stateless/stacks/filemanager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ clean: docker-clean
## Database related targets
psql:
@docker compose exec postgres psql filemanager -U filemanager
restore:
@docker compose exec -T postgres pg_restore -U filemanager -d filemanager

## Targets related to top-level database management and S3.
apply-schema:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Creates the `is_current_state` column to separate objects by current and historical records.

begin;

-- -- Initially, set the `is_current_state` to false to make migrating existing data easier.
alter table s3_object add column is_current_state boolean not null default false;

-- This migrates existing data, first find the current state and update existing records.
with to_update as (
-- Get all records representing the current state.
select * from (
select distinct on (bucket, key, version_id) * from s3_object
order by bucket, key, version_id, sequencer desc
) as s3_object
where event_type = 'Created' and is_delete_marker = false
)
-- Update `is_current_state` on existing records.
update s3_object
set is_current_state = true
from to_update
where s3_object.s3_object_id = to_update.s3_object_id;

-- Then, set the default to true to match new logic using `is_current_state`.
alter table s3_object alter column is_current_state set default true;

-- Create an indexes for now, although partitioning will be required later.
create index is_current_state_index on s3_object (is_current_state);
-- This helps the query which resets the current state when ingesting objects.
create index reset_current_state_index on s3_object (bucket, key, version_id, sequencer, is_current_state);

commit;
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Resets the `is_current_state` to false for a set of objects based on the `bucket`, `key`, `version_id`
-- and `sequencer`. This is used to update the current state so that a new object can have it's `is_current_state`
-- set to true based on whether it is a `Created` or `Deleted` event.

-- Unnest input.
with input as (
select
*
from unnest(
$1::text[],
$2::text[],
$3::text[],
$4::text[]
) as input (
bucket,
key,
version_id,
sequencer
)
),
-- Select objects to update.
to_update as (
select * from input cross join lateral (
select
s3_object_id,
-- This finds the first value in the set which represents the most up-to-date state.
-- If ordered by the sequencer, the first row is the one that needs to have `is_current_state`
-- set to 'true' only for `Created` events, as `Deleted` events are always non-current state.
case when row_number() over (order by s3_object.sequencer desc) = 1 then
event_type = 'Created'
-- Set `is_current_state` to 'false' for all other rows, as this is now historical data.
else
false
end as updated_state
from s3_object
where
-- This should be fairly efficient as it's only targeting objects where `is_current_state` is true,
-- or objects with the highest sequencer values (in case of an out-of-order event). This means that
-- although there is a performance impact for running this on ingestion, it should be minimal with
-- the right indexes.
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id and
-- This is an optimization which prevents querying against all objects in the set.
(
-- Only need to update current objects
s3_object.is_current_state = true or
-- Or objects where there is a newer sequencer than the one being processed.
-- This is required in case an out-of-order event is encountered. This always
-- includes the object being processed as it's required for the above row-logic.
s3_object.sequencer >= input.sequencer
)
) s3_object
)
update s3_object
set is_current_state = updated_state
from to_update
where s3_object.s3_object_id = to_update.s3_object_id
returning s3_object.s3_object_id, s3_object.is_current_state;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ select
event_type,
ingest_id,
attributes,
is_current_state,
0::bigint as "number_reordered"
from input
-- Grab the most recent object in each input group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ insert into s3_object (
is_delete_marker,
event_type,
ingest_id,
is_current_state,
attributes
)
values (
Expand All @@ -31,7 +32,8 @@ values (
unnest($12::boolean[]),
unnest($13::event_type[]),
unnest($14::uuid[]),
unnest($15::jsonb[])
unnest($15::boolean[]),
unnest($16::jsonb[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning s3_object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ select
size,
is_delete_marker,
ingest_id,
is_current_state,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order created event, so return a created event back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ select
size,
is_delete_marker,
ingest_id,
is_current_state,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order deleted event, so return a deleted event back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ export class MigrateFunction extends fn.Function {
constructor(scope: Construct, id: string, props: MigrateFunctionProps) {
super(scope, id, {
package: 'filemanager-migrate-lambda',
timeout: Duration.minutes(2),
// This needs to be higher to account for longer migrations.
timeout: Duration.minutes(15),
...props,
});

Expand Down
12 changes: 12 additions & 0 deletions lib/workload/stateless/stacks/filemanager/docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ Within the application code, out of order events are removed within the [events]
By default, filemanager makes no assumption about the ordering of events, and ingests events in the order that they arrive.
The sequencer value is stored on the `s3_object` table, which allows ordering entries when querying.

### Current vs historical records

Since the filemanager database keeps growing as records are never deleted, the current state of records is stored on a
`is_current_state` column. This column indicates which records represent real objects in S3, and which records are
historical data that represent previously deleted objects.

This value is computed when events are ingested, and automatically kept up to date. This is done at ingestion because
the performance impact of determining this is too great on every API call. This does incur a performance penalty for
ingestion. However, it should be minimal as only other current records need to be considered. This is because records
only need to transition from current to historical (and not the other way around). Records which are current represent
a smaller subset of all records, meaning that only a smaller part of the dataset needs to be queried when ingesting.

#### Paired ingest mode
Ordering events on ingestion can be turned on by setting `PAIRED_INGEST_MODE=true` as an environment variable. This has
a performance cost on ingestion, but it removes the requirment to order events when querying the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ async fn main() -> Result<()> {
.with_key_divisor(key_divisor)
.with_shuffle(shuffle)
.build(state.database_client())
.await;
.await
.unwrap();
}

if args.migrate {
Expand Down
Loading

0 comments on commit d45b20b

Please sign in to comment.