Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filemanager): ingest_id tagging and object move tracking #585

Merged
merged 8 commits into from
Oct 4, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table s3_object add column ingest_id uuid;
create index ingest_id_index on s3_object (ingest_id);
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ select
size,
is_delete_marker,
event_type,
ingest_id,
attributes,
0::bigint as "number_reordered"
from input
-- Grab the most recent object in each input group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ insert into s3_object (
version_id,
sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
values (
unnest($1::uuid[]),
Expand All @@ -27,7 +29,9 @@ values (
unnest($10::text[]),
unnest($11::text[]),
unnest($12::boolean[]),
unnest($13::event_type[])
unnest($13::event_type[]),
unnest($14::uuid[]),
unnest($15::jsonb[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning s3_object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ insert into s3_object (
version_id,
sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
values (
unnest($1::uuid[]),
Expand All @@ -27,7 +29,9 @@ values (
unnest($10::text[]),
unnest($11::text[]),
unnest($12::boolean[]),
unnest($13::event_type[])
unnest($13::event_type[]),
unnest($14::uuid[]),
unnest($15::jsonb[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning s3_object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ with input as (
$10::text[],
$11::text[],
$12::boolean[],
$13::event_type[]
$13::event_type[],
$14::uuid[],
$15::jsonb[]
) as input (
s3_object_id,
bucket,
Expand All @@ -32,7 +34,9 @@ with input as (
version_id,
created_sequencer,
is_delete_marker,
event_type
event_type,
ingest_id,
attributes
)
),
-- Then, select the objects that need to be updated.
Expand All @@ -51,7 +55,8 @@ current_objects as (
input.e_tag as input_e_tag,
input.storage_class as input_storage_class,
input.is_delete_marker as input_is_delete_marker,
input.event_type as input_event_type
input.event_type as input_event_type,
input.ingest_id as input_ingest_id
from s3_object
-- Grab the relevant values to update with.
join input on
Expand Down Expand Up @@ -100,6 +105,7 @@ update as (
is_delete_marker = objects_to_update.input_is_delete_marker,
storage_class = objects_to_update.input_storage_class,
event_type = objects_to_update.input_event_type,
ingest_id = objects_to_update.input_ingest_id,
number_reordered = s3_object.number_reordered +
-- Note the asymmetry between this and the reorder for deleted query.
case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then
Expand Down Expand Up @@ -127,6 +133,8 @@ select
number_duplicate_events,
size,
is_delete_marker,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order created event, so return a created event back.
'Created'::event_type as "event_type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ select
number_duplicate_events,
size,
is_delete_marker,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order deleted event, so return a deleted event back.
'Deleted'::event_type as "event_type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class ApiFunction extends fn.Function {
// See https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/lambda-http#integration-with-api-gateway-stages
// for more info.
AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH: 'true',
...props.environment,
},
...props,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ export class Function extends Construct {
/**
* Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role.
*/
addPoliciesForBuckets(buckets: string[]) {
addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) {
buckets.map((bucket) => {
this.addToPolicy(
new PolicyStatement({
actions: ['s3:ListBucket', 's3:GetObject'],
actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])],
resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`],
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IQueue } from 'aws-cdk-lib/aws-sqs';
import * as fn from './function';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { DatabaseProps } from './function';
import { FILEMANAGER_SERVICE_NAME } from '../../stack';

/**
* Props for controlling access to buckets.
Expand Down Expand Up @@ -35,14 +36,26 @@ export type IngestFunctionProps = fn.FunctionPropsConfigurable & DatabaseProps &
*/
export class IngestFunction extends fn.Function {
constructor(scope: Construct, id: string, props: IngestFunctionProps) {
super(scope, id, { package: 'filemanager-ingest-lambda', ...props });
super(scope, id, {
package: 'filemanager-ingest-lambda',
environment: {
FILEMANAGER_INGESTER_TAG_NAME: 'umccr-org:OrcaBusFileManagerIngestId',
...props.environment,
},
...props,
});

this.addAwsManagedPolicy('service-role/AWSLambdaSQSQueueExecutionRole');

props.eventSources.forEach((source) => {
const eventSource = new SqsEventSource(source);
this.function.addEventSource(eventSource);
});
this.addPoliciesForBuckets(props.buckets);
this.addPoliciesForBuckets(props.buckets, [
's3:GetObjectTagging',
's3:GetObjectVersionTagging',
's3:PutObjectTagging',
's3:PutObjectVersionTagging',
]);
}
}
58 changes: 58 additions & 0 deletions lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Tracking moved objects

The filemanager tracks records from S3 events, which do not describe how objects move from one location to another. Using
S3 events alone, it's not possible to tell whether an object that has been deleted in one place and created in another is
the same object that has been moved, or two different objects. This is because S3 only tracks `Created` or `Deleted`
events.

To track moved objects, filemanager stores additional information in S3 tags. The tag gets updated when the object
is moved. This allows the filemanager to track how objects move and also allows it to copy attributes when an object
is moved/copied. This process is described [below](#tagging-process).

## Tagging process

The process of tagging is:

* When an object record is ingested, the filemanager queries `Created` events for tags.
* If the tags can be retrieved, the filemanager looks for a key called `ingest_id`. The key name can be
configured in the environment variable `FILEMANAGER_INGESTER_TAG_NAME`.
* The tag is parsed as a UUID, and stored in the `ingest_id` column of `s3_object` for that record.
* If the tag does not exist, then a new UUID is generated, and the object is tagged on S3 by calling `PutObjectTagging`.
The new tag is also stored in the `ingest_id` column.
* The database is also queried for any records with the same `ingest_id` so that attributes can be copied to the new record.

This logic is enabled by default, but it can be switched off by setting `FILEMANAGER_INGESTER_TRACK_MOVES`. The filemanager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagging is currently part of the ingestion process, right?
So there's a possibility that this may slow down the ingestion and may become an issue under heavy load?
Not for now, but if that should become the case, we could think of an async tagging strategy.
Given the option to disable tagging (or tagging failing/missing for other reasons), it would be great to think of an async tagging option.

Copy link
Member Author

@mmalenic mmalenic Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree with this, it could potentially slow things down as it's done on ingestion. It's a bit tricky because the act of tagging the object conveys the information of the move - ideally this would be done as soon as possible (i.e. on ingestion). Anything async would extend the window that the object isn't tagged, meaning that the move can't be tracked. In practice this probably wouldn't make a different if the object isn't moved as soon as it's created.

There are s3:ObjectTagging:* events which might be good for this that I'll look into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always challenging and tradeoff. Let's give it a shot!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, tricky but that's a general "issue" with event bases systems: there's an inevitable delay/asynchronicity.

And I am not saying we should implement that now. An open ticket or comment in the code to keep track of it is perfectly fine.

To compensate for potential concurrency issues, the mentioned support of checksums, name matches, etc could be used... at least to some extend. All future considerations... all good for now!

API provides a way to query the database for records with a given `ingest_id`.

## Design considerations

Object tags on S3 are [limited][s3-tagging] to 10 tags per object, and each tag can only store 258 unicode characters.
The filemanager avoids storing a large amount of data by using a UUID as the value of the tag, which is linked to object
records that store attributes and data.

The object tagging process cannot be atomic, so there is a chance for concurrency errors to occur. Tagging can also
fail due to database or network errors. The filemanager only tracks `ingest_id`s if it knows that a tag has been
successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled, then the `ingest_id`
column will be null.

The act of tagging the object allows it to be tracked - ideally this is done as soon as possible. Currently, this happens
at ingestion, however this could have performance implications. Alternative approaches should consider asynchronous tagging.
For example, [`s3:ObjectTagging:*`][s3-tagging-event] events could be used for this purpose.

The object tagging mechanism also doesn't differentiate between moved objects and copied objects with the same tags.
If an object is copied with tags, the `ingest_id` will also be copied and the above logic will apply.

## Alternative designs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIce doc!

I'd probably add another small note on the checksum approach: it can't be used if the checksums are not expected to be the same, e.g. with compression, which is a big use case for us.


Alternatively, S3 object metadata could also be used to track moves using a similar mechanism. However, metadata can
[only be updated][s3-metadata] by deleting and recreated the object. This process would be much more costly so tagging
was chosen instead.

Another approach is to compare object checksums or etags. However, this would also be limited if the checksum is not present,
or if the etag was computed using a different part-size. It also fails if the checksum is not expected to be the same,
for example, if tracking set of files that are compressed. Both these approaches could be used in addition to object tagging
to provide more ways to track moves.

[s3-tagging]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
[s3-tagging-event]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types
[s3-metadata]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use std::result;
use aws_sdk_s3 as s3;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::operation::get_object_tagging::{GetObjectTaggingError, GetObjectTaggingOutput};
use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput};
use aws_sdk_s3::operation::list_buckets::{ListBucketsError, ListBucketsOutput};
use aws_sdk_s3::operation::put_object_tagging::{PutObjectTaggingError, PutObjectTaggingOutput};
use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig};
use aws_sdk_s3::types::ChecksumMode::Enabled;
use aws_sdk_s3::types::Tagging;
use chrono::Duration;
use mockall::automock;

Expand Down Expand Up @@ -70,6 +73,36 @@ impl Client {
.await
}

/// Execute the `GetObjectTagging` operation.
pub async fn get_object_tagging(
&self,
key: &str,
bucket: &str,
) -> Result<GetObjectTaggingOutput, GetObjectTaggingError> {
self.inner
.get_object_tagging()
.key(key)
.bucket(bucket)
.send()
.await
}

/// Execute the `PutObjectTagging` operation.
pub async fn put_object_tagging(
&self,
key: &str,
bucket: &str,
tagging: Tagging,
) -> Result<PutObjectTaggingOutput, PutObjectTaggingError> {
self.inner
.put_object_tagging()
.key(key)
.bucket(bucket)
.tagging(tagging)
.send()
.await
}

/// Execute the `GetObject` operation and generate a presigned url for the object.
pub async fn presign_url(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl Ingester {
.bind(&events.sequencers)
.bind(&events.is_delete_markers)
.bind(&events.event_types)
.bind(&events.ingest_ids)
.bind(&events.attributes)
.fetch_all(&mut *tx)
.await?;

Expand All @@ -74,6 +76,7 @@ pub(crate) mod tests {
use sqlx::postgres::PgRow;
use sqlx::{Executor, PgPool, Row};
use tokio::time::Instant;
use uuid::Uuid;

use crate::database::aws::migration::tests::MIGRATOR;
use crate::database::{Client, Ingest};
Expand All @@ -90,6 +93,7 @@ pub(crate) mod tests {
};
use crate::events::EventSourceType;
use crate::events::EventSourceType::S3;
use crate::uuid::UuidGenerator;

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_object_created(pool: PgPool) {
Expand All @@ -101,6 +105,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_created(&s3_object_results[0]);
}

Expand Down Expand Up @@ -147,6 +154,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_with(
&s3_object_results[0],
Some(i64::MAX),
Expand Down Expand Up @@ -183,6 +193,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_eq!(
1,
s3_object_results[0].get::<i64, _>("number_duplicate_events")
Expand Down Expand Up @@ -287,6 +300,9 @@ pub(crate) mod tests {
let s3_object_results = fetch_results(&ingester).await;

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
// Order should be different here.
assert_ingest_events(
&s3_object_results[1],
Expand Down Expand Up @@ -1064,7 +1080,9 @@ pub(crate) mod tests {
assert_row(s3_object_results, message, sequencer, event_time);
}

fn update_test_events(mut events: TransposedS3EventMessages) -> TransposedS3EventMessages {
pub(crate) fn update_test_events(
mut events: TransposedS3EventMessages,
) -> TransposedS3EventMessages {
let update_last_modified = |dates: &mut Vec<Option<DateTime<Utc>>>| {
dates.iter_mut().for_each(|last_modified| {
*last_modified = Some(DateTime::default());
Expand All @@ -1080,10 +1098,16 @@ pub(crate) mod tests {
*sha256 = Some(EXPECTED_SHA256.to_string());
});
};
let update_ingest_ids = |ingest_ids: &mut Vec<Option<Uuid>>| {
ingest_ids.iter_mut().for_each(|ingest_id| {
*ingest_id = Some(UuidGenerator::generate());
});
};

update_last_modified(&mut events.last_modified_dates);
update_storage_class(&mut events.storage_classes);
update_sha256(&mut events.sha256s);
update_ingest_ids(&mut events.ingest_ids);

events
}
Expand Down
Loading