Skip to content

Commit

Permalink
Merge pull request #585 from umccr/feat/filemanager-tagging
Browse files Browse the repository at this point in the history
feat(filemanager): ingest_id tagging and object move tracking
  • Loading branch information
mmalenic authored Oct 4, 2024
2 parents d2dae68 + 5cf5f71 commit 643b0af
Show file tree
Hide file tree
Showing 26 changed files with 812 additions and 237 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table s3_object add column ingest_id uuid;
create index ingest_id_index on s3_object (ingest_id);
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ select
size,
is_delete_marker,
event_type,
ingest_id,
attributes,
0::bigint as "number_reordered"
from input
-- Grab the most recent object in each input group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ insert into s3_object (
version_id,
sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
values (
unnest($1::uuid[]),
Expand All @@ -27,7 +29,9 @@ values (
unnest($10::text[]),
unnest($11::text[]),
unnest($12::boolean[]),
unnest($13::event_type[])
unnest($13::event_type[]),
unnest($14::uuid[]),
unnest($15::jsonb[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning s3_object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ insert into s3_object (
version_id,
sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
values (
unnest($1::uuid[]),
Expand All @@ -27,7 +29,9 @@ values (
unnest($10::text[]),
unnest($11::text[]),
unnest($12::boolean[]),
unnest($13::event_type[])
unnest($13::event_type[]),
unnest($14::uuid[]),
unnest($15::jsonb[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning s3_object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ with input as (
$10::text[],
$11::text[],
$12::boolean[],
$13::event_type[]
$13::event_type[],
$14::uuid[],
$15::jsonb[]
) as input (
s3_object_id,
bucket,
Expand All @@ -32,7 +34,9 @@ with input as (
version_id,
created_sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
),
-- Then, select the objects that need to be updated.
Expand All @@ -51,7 +55,8 @@ current_objects as (
input.e_tag as input_e_tag,
input.storage_class as input_storage_class,
input.is_delete_marker as input_is_delete_marker,
input.event_type as input_event_type
input.event_type as input_event_type,
input.ingest_id as input_ingest_id
from s3_object
-- Grab the relevant values to update with.
join input on
Expand Down Expand Up @@ -100,6 +105,7 @@ update as (
is_delete_marker = objects_to_update.input_is_delete_marker,
storage_class = objects_to_update.input_storage_class,
event_type = objects_to_update.input_event_type,
ingest_id = objects_to_update.input_ingest_id,
number_reordered = s3_object.number_reordered +
-- Note the asymmetry between this and the reorder for deleted query.
case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then
Expand Down Expand Up @@ -127,6 +133,8 @@ select
number_duplicate_events,
size,
is_delete_marker,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order created event, so return a created event back.
'Created'::event_type as "event_type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ select
number_duplicate_events,
size,
is_delete_marker,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order deleted event, so return a deleted event back.
'Deleted'::event_type as "event_type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class ApiFunction extends fn.Function {
// See https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/lambda-http#integration-with-api-gateway-stages
// for more info.
AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH: 'true',
...props.environment,
},
...props,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ export class Function extends Construct {
/**
* Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role.
*/
addPoliciesForBuckets(buckets: string[]) {
addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) {
buckets.map((bucket) => {
this.addToPolicy(
new PolicyStatement({
actions: ['s3:ListBucket', 's3:GetObject'],
actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])],
resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`],
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IQueue } from 'aws-cdk-lib/aws-sqs';
import * as fn from './function';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { DatabaseProps } from './function';
import { FILEMANAGER_SERVICE_NAME } from '../../stack';

/**
* Props for controlling access to buckets.
Expand Down Expand Up @@ -35,14 +36,26 @@ export type IngestFunctionProps = fn.FunctionPropsConfigurable & DatabaseProps &
*/
export class IngestFunction extends fn.Function {
constructor(scope: Construct, id: string, props: IngestFunctionProps) {
super(scope, id, { package: 'filemanager-ingest-lambda', ...props });
super(scope, id, {
package: 'filemanager-ingest-lambda',
environment: {
FILEMANAGER_INGESTER_TAG_NAME: 'umccr-org:OrcaBusFileManagerIngestId',
...props.environment,
},
...props,
});

this.addAwsManagedPolicy('service-role/AWSLambdaSQSQueueExecutionRole');

props.eventSources.forEach((source) => {
const eventSource = new SqsEventSource(source);
this.function.addEventSource(eventSource);
});
this.addPoliciesForBuckets(props.buckets);
this.addPoliciesForBuckets(props.buckets, [
's3:GetObjectTagging',
's3:GetObjectVersionTagging',
's3:PutObjectTagging',
's3:PutObjectVersionTagging',
]);
}
}
58 changes: 58 additions & 0 deletions lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Tracking moved objects

The filemanager tracks records from S3 events, which do not describe how objects move from one location to another. Using
S3 events alone, it's not possible to tell whether an object that has been deleted in one place and created in another is
the same object that has been moved, or two different objects. This is because S3 only tracks `Created` or `Deleted`
events.

To track moved objects, filemanager stores additional information in S3 tags. The tag gets updated when the object
is moved. This allows the filemanager to track how objects move and also allows it to copy attributes when an object
is moved/copied. This process is described [below](#tagging-process).

## Tagging process

The process of tagging is:

* When an object record is ingested, the filemanager queries `Created` events for tags.
* If the tags can be retrieved, the filemanager looks for a key called `ingest_id`. The key name can be
configured in the environment variable `FILEMANAGER_INGESTER_TAG_NAME`.
* The tag is parsed as a UUID, and stored in the `ingest_id` column of `s3_object` for that record.
* If the tag does not exist, then a new UUID is generated, and the object is tagged on S3 by calling `PutObjectTagging`.
The new tag is also stored in the `ingest_id` column.
* The database is also queried for any records with the same `ingest_id` so that attributes can be copied to the new record.

This logic is enabled by default, but it can be switched off by setting `FILEMANAGER_INGESTER_TRACK_MOVES`. The filemanager
API provides a way to query the database for records with a given `ingest_id`.

## Design considerations

Object tags on S3 are [limited][s3-tagging] to 10 tags per object, and each tag can only store 258 unicode characters.
The filemanager avoids storing a large amount of data by using a UUID as the value of the tag, which is linked to object
records that store attributes and data.

The object tagging process cannot be atomic, so there is a chance for concurrency errors to occur. Tagging can also
fail due to database or network errors. The filemanager only tracks `ingest_id`s if it knows that a tag has been
successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled, then the `ingest_id`
column will be null.

The act of tagging the object allows it to be tracked - ideally this is done as soon as possible. Currently, this happens
at ingestion, however this could have performance implications. Alternative approaches should consider asynchronous tagging.
For example, [`s3:ObjectTagging:*`][s3-tagging-event] events could be used for this purpose.

The object tagging mechanism also doesn't differentiate between moved objects and copied objects with the same tags.
If an object is copied with tags, the `ingest_id` will also be copied and the above logic will apply.

## Alternative designs

Alternatively, S3 object metadata could also be used to track moves using a similar mechanism. However, metadata can
[only be updated][s3-metadata] by deleting and recreated the object. This process would be much more costly so tagging
was chosen instead.

Another approach is to compare object checksums or etags. However, this would also be limited if the checksum is not present,
or if the etag was computed using a different part-size. It also fails if the checksum is not expected to be the same,
for example, if tracking set of files that are compressed. Both these approaches could be used in addition to object tagging
to provide more ways to track moves.

[s3-tagging]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
[s3-tagging-event]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types
[s3-metadata]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use std::result;
use aws_sdk_s3 as s3;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::operation::get_object_tagging::{GetObjectTaggingError, GetObjectTaggingOutput};
use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput};
use aws_sdk_s3::operation::list_buckets::{ListBucketsError, ListBucketsOutput};
use aws_sdk_s3::operation::put_object_tagging::{PutObjectTaggingError, PutObjectTaggingOutput};
use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig};
use aws_sdk_s3::types::ChecksumMode::Enabled;
use aws_sdk_s3::types::Tagging;
use chrono::Duration;
use mockall::automock;

Expand Down Expand Up @@ -70,6 +73,36 @@ impl Client {
.await
}

/// Execute the `GetObjectTagging` operation.
pub async fn get_object_tagging(
&self,
key: &str,
bucket: &str,
) -> Result<GetObjectTaggingOutput, GetObjectTaggingError> {
self.inner
.get_object_tagging()
.key(key)
.bucket(bucket)
.send()
.await
}

/// Execute the `PutObjectTagging` operation.
pub async fn put_object_tagging(
&self,
key: &str,
bucket: &str,
tagging: Tagging,
) -> Result<PutObjectTaggingOutput, PutObjectTaggingError> {
self.inner
.put_object_tagging()
.key(key)
.bucket(bucket)
.tagging(tagging)
.send()
.await
}

/// Execute the `GetObject` operation and generate a presigned url for the object.
pub async fn presign_url(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl Ingester {
.bind(&events.sequencers)
.bind(&events.is_delete_markers)
.bind(&events.event_types)
.bind(&events.ingest_ids)
.bind(&events.attributes)
.fetch_all(&mut *tx)
.await?;

Expand All @@ -74,6 +76,7 @@ pub(crate) mod tests {
use sqlx::postgres::PgRow;
use sqlx::{Executor, PgPool, Row};
use tokio::time::Instant;
use uuid::Uuid;

use crate::database::aws::migration::tests::MIGRATOR;
use crate::database::{Client, Ingest};
Expand All @@ -90,6 +93,7 @@ pub(crate) mod tests {
};
use crate::events::EventSourceType;
use crate::events::EventSourceType::S3;
use crate::uuid::UuidGenerator;

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_created(pool: PgPool) {
Expand All @@ -101,6 +105,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_created(&s3_object_results[0]);
}

Expand Down Expand Up @@ -147,6 +154,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_with(
&s3_object_results[0],
Some(i64::MAX),
Expand Down Expand Up @@ -183,6 +193,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_eq!(
1,
s3_object_results[0].get::<i64, _>("number_duplicate_events")
Expand Down Expand Up @@ -287,6 +300,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
// Order should be different here.
assert_ingest_events(
&s3_object_results[1],
Expand Down Expand Up @@ -1064,7 +1080,9 @@ pub(crate) mod tests {
assert_row(s3_object_results, message, sequencer, event_time);
}

fn update_test_events(mut events: TransposedS3EventMessages) -> TransposedS3EventMessages {
pub(crate) fn update_test_events(
mut events: TransposedS3EventMessages,
) -> TransposedS3EventMessages {
let update_last_modified = |dates: &mut Vec<Option<DateTime<Utc>>>| {
dates.iter_mut().for_each(|last_modified| {
*last_modified = Some(DateTime::default());
Expand All @@ -1080,10 +1098,16 @@ pub(crate) mod tests {
*sha256 = Some(EXPECTED_SHA256.to_string());
});
};
let update_ingest_ids = |ingest_ids: &mut Vec<Option<Uuid>>| {
ingest_ids.iter_mut().for_each(|ingest_id| {
*ingest_id = Some(UuidGenerator::generate());
});
};

update_last_modified(&mut events.last_modified_dates);
update_storage_class(&mut events.storage_classes);
update_sha256(&mut events.sha256s);
update_ingest_ids(&mut events.ingest_ids);

events
}
Expand Down
Loading

0 comments on commit 643b0af

Please sign in to comment.