Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: de-duplicate events at the database level #88

Merged
merged 20 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8443bfa
feat(filemanager): add basic proc macro outline
mmalenic Jan 16, 2024
e172dd7
refactor(filemanager): improve equality checking
mmalenic Jan 16, 2024
0fb20fb
Merge branch 'main' of github.com:umccr/orcabus into feat/event-order…
mmalenic Jan 16, 2024
4f8e9d1
refactor(filemanager): fix test, merge with main
mmalenic Jan 16, 2024
1eed111
feat(filemanager): add additional sequencer for s3_object
mmalenic Jan 18, 2024
05882af
refactor(filemanager): move e_tag to s3_object, add bucket and key to…
mmalenic Jan 18, 2024
ff8a2c3
feat(filemanager): add sequencer check constraint to s3_object
mmalenic Jan 18, 2024
94b15d1
feat(filemanager): add unique constraints
mmalenic Jan 18, 2024
d865b24
feat(filemanager): remove key and bucket reference from s3_object
mmalenic Jan 18, 2024
ee325fe
refactor(filemanager): update inserts with sequencer values
mmalenic Jan 19, 2024
3ed4bad
refactor(filemanager): move more fields to s3_object, update queries
mmalenic Jan 21, 2024
8c3c575
test(filemanager): fix tests according to new schema
mmalenic Jan 21, 2024
c20e946
test(filemanager): defer initializing foreign key and run inserts in …
mmalenic Jan 22, 2024
80dd346
test(filemanager): duplicate events database test
mmalenic Jan 23, 2024
727c3d9
test(filemanager): add complex duplicates test
mmalenic Jan 23, 2024
a04f108
refactor(filemanager): remove macros as its not used
mmalenic Jan 23, 2024
9bcd3f8
Merge branch 'main' of github.com:umccr/orcabus into feat/event-order…
mmalenic Jan 23, 2024
ff9a5d4
style(filemanager): formatting
mmalenic Jan 23, 2024
8908ea3
fix(filemanager): consider version id when de-duplicating as well
mmalenic Jan 23, 2024
661490c
Merge branch 'main' of github.com:umccr/orcabus into feat/event-order…
mmalenic Jan 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/workload/stateful/filemanager/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/workload/stateful/filemanager/database/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM postgres:16
FROM postgres:15

COPY migrations/ /docker-entrypoint-initdb.d/
Original file line number Diff line number Diff line change
@@ -1,22 +1,9 @@
-- An general object table common across all storage types.
create table object (
-- The unique id for this object.
object_id uuid not null default gen_random_uuid() primary key,
-- The bucket location.
bucket varchar(255) not null,
-- The name of the object.
key varchar(1024) not null,
object_id uuid not null primary key default gen_random_uuid(),
-- The size of the object.
size int default null,
size integer default null,
-- A unique identifier for the object, if it is present.
hash varchar(255) default null,
-- When this object was created.
created_date timestamptz not null default now(),
-- When this object was last modified.
last_modified_date timestamptz not null default now(),
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- The date of the object and its id combined.
portal_run_id varchar(255) not null
-- provenance - history of all objects and how they move?
checksum text default null
);
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,41 @@ create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'Intel

-- An object contain in AWS S3, maps as a one-to-one relationship with the object table.
create table s3_object(
-- The object id.
object_id uuid references object (object_id) primary key,
-- The s3 object id.
s3_object_id uuid not null primary key default gen_random_uuid(),
-- This is initially deferred because we want to create an s3_object before an object to check for duplicates/order.
object_id uuid references object (object_id) deferrable initially deferred,

-- General fields
-- The bucket of the object.
bucket text not null,
-- The key of the object.
key text not null,
-- When this object was created.
created_date timestamptz not null default now(),
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- provenance - history of all objects and how they move?

-- AWS-specific fields
-- The AWS last modified value.
last_modified_date timestamptz default null,
-- An S3-specific e_tag, if it is present.
e_tag text default null,
-- The S3 storage class of the object.
storage_class storage_class not null
);
storage_class storage_class not null,
-- The version id of the object, if present.
version_id text default null,
-- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events.
created_sequencer text default null,
-- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events.
deleted_sequencer text default null,
-- Record whether the event that generated this object was ever out of order, useful for debugging.
event_out_of_order boolean not null default false,
-- Record the number of duplicate events received for this object, useful for debugging.
number_duplicate_events integer not null default 0,

-- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event.
constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer),
constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Bulk insert of s3 objects.
insert into s3_object (
s3_object_id,
object_id,
bucket,
key,
created_date,
last_modified_date,
e_tag,
storage_class,
version_id,
created_sequencer
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($7::text[]),
unnest($8::storage_class[]),
unnest($9::text[]),
unnest($10::text[])
) on conflict on constraint created_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Bulk insert of s3 objects.
insert into s3_object (
s3_object_id,
object_id,
bucket,
key,
-- We default the created date to a value event if this is a deleted event,
-- as we are expecting this to get updated.
created_date,
deleted_date,
last_modified_date,
e_tag,
storage_class,
version_id,
deleted_sequencer
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($7::timestamptz[]),
unnest($8::text[]),
unnest($9::storage_class[]),
unnest($10::text[]),
unnest($11::text[])
) on conflict on constraint deleted_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Comment on lines +1 to +31
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't being used yet, however I think it will be for #73.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
-- Update the deleted time of s3 objects.
update object
-- Update the deleted time of objects.
update s3_object
set deleted_date = data.deleted_time
from (select
unnest($1::varchar[]) as key,
unnest($2::varchar[]) as bucket,
unnest($3::timestamptz[]) as deleted_time
) as data
where object.key = data.key and object.bucket = data.bucket;
where s3_object.key = data.key and s3_object.bucket = data.bucket;
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
-- Bulk insert of objects
insert into object (object_id, bucket, key, size, hash, created_date, last_modified_date, portal_run_id)
insert into object (object_id, size, checksum)
values (
unnest($1::uuid[]),
unnest($2::varchar[]),
unnest($3::varchar[]),
unnest($4::int[]),
unnest($5::varchar[]),
unnest($6::timestamptz[]),
unnest($7::timestamptz[]),
unnest($8::varchar[])
);
unnest($2::int[]),
unnest($3::text[])
);
1 change: 1 addition & 0 deletions lib/workload/stateful/filemanager/filemanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mockall = "0.12"
mockall_double = "0.3"
lambda_runtime = "0.8"
aws_lambda_events = "0.12"
itertools = "0.12"

# AWS
aws-sdk-sqs = "1"
Expand Down
Loading