Skip to content

Commit

Permalink
Merge pull request #88 from umccr/feat/event-order-macros
Browse files Browse the repository at this point in the history
feat: de-duplicate events at the database level
  • Loading branch information
mmalenic authored Jan 24, 2024
2 parents 346780c + 661490c commit a74114c
Show file tree
Hide file tree
Showing 15 changed files with 641 additions and 208 deletions.
14 changes: 12 additions & 2 deletions lib/workload/stateful/filemanager/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/workload/stateful/filemanager/database/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM postgres:16
FROM postgres:15

COPY migrations/ /docker-entrypoint-initdb.d/
Original file line number Diff line number Diff line change
@@ -1,22 +1,9 @@
-- An general object table common across all storage types.
create table object (
-- The unique id for this object.
object_id uuid not null default gen_random_uuid() primary key,
-- The bucket location.
bucket varchar(255) not null,
-- The name of the object.
key varchar(1024) not null,
object_id uuid not null primary key default gen_random_uuid(),
-- The size of the object.
size int default null,
size integer default null,
-- A unique identifier for the object, if it is present.
hash varchar(255) default null,
-- When this object was created.
created_date timestamptz not null default now(),
-- When this object was last modified.
last_modified_date timestamptz not null default now(),
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- The date of the object and its id combined.
portal_run_id varchar(255) not null
-- provenance - history of all objects and how they move?
checksum text default null
);
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,41 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel

-- An object contain in AWS S3, maps as a one-to-one relationship with the object table.
create table s3_object(
-- The object id.
object_id uuid references object (object_id) primary key,
-- The s3 object id.
s3_object_id uuid not null primary key default gen_random_uuid(),
-- This is initially deferred because we want to create an s3_object before an object to check for duplicates/order.
object_id uuid references object (object_id) deferrable initially deferred,

-- General fields
-- The bucket of the object.
bucket text not null,
-- The key of the object.
key text not null,
-- When this object was created.
created_date timestamptz not null default now(),
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- provenance - history of all objects and how they move?

-- AWS-specific fields
-- The AWS last modified value.
last_modified_date timestamptz default null,
-- An S3-specific e_tag, if it is present.
e_tag text default null,
-- The S3 storage class of the object.
storage_class storage_class not null
);
storage_class storage_class not null,
-- The version id of the object, if present.
version_id text default null,
-- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events.
created_sequencer text default null,
-- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events.
deleted_sequencer text default null,
-- Record whether the event that generated this object was ever out of order, useful for debugging.
event_out_of_order boolean not null default false,
-- Record the number of duplicate events received for this object, useful for debugging.
number_duplicate_events integer not null default 0,

-- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event.
constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer),
constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Bulk insert of s3 objects.
insert into s3_object (
s3_object_id,
object_id,
bucket,
key,
created_date,
last_modified_date,
e_tag,
storage_class,
version_id,
created_sequencer
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($7::text[]),
unnest($8::storage_class[]),
unnest($9::text[]),
unnest($10::text[])
) on conflict on constraint created_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Bulk insert of s3 objects.
insert into s3_object (
s3_object_id,
object_id,
bucket,
key,
-- We default the created date to a value event if this is a deleted event,
-- as we are expecting this to get updated.
created_date,
deleted_date,
last_modified_date,
e_tag,
storage_class,
version_id,
deleted_sequencer
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($7::timestamptz[]),
unnest($8::text[]),
unnest($9::storage_class[]),
unnest($10::text[]),
unnest($11::text[])
) on conflict on constraint deleted_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
-- Update the deleted time of s3 objects.
update object
-- Update the deleted time of objects.
update s3_object
set deleted_date = data.deleted_time
from (select
unnest($1::varchar[]) as key,
unnest($2::varchar[]) as bucket,
unnest($3::timestamptz[]) as deleted_time
) as data
where object.key = data.key and object.bucket = data.bucket;
where s3_object.key = data.key and s3_object.bucket = data.bucket;
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
-- Bulk insert of objects
insert into object (object_id, bucket, key, size, hash, created_date, last_modified_date, portal_run_id)
insert into object (object_id, size, checksum)
values (
unnest($1::uuid[]),
unnest($2::varchar[]),
unnest($3::varchar[]),
unnest($4::int[]),
unnest($5::varchar[]),
unnest($6::timestamptz[]),
unnest($7::timestamptz[]),
unnest($8::varchar[])
);
unnest($2::int[]),
unnest($3::text[])
);
1 change: 1 addition & 0 deletions lib/workload/stateful/filemanager/filemanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mockall = "0.12"
mockall_double = "0.3"
lambda_runtime = "0.8"
aws_lambda_events = "0.12"
itertools = "0.12"

# AWS
aws-sdk-sqs = "1"
Expand Down
Loading

0 comments on commit a74114c

Please sign in to comment.