Skip to content

Commit

Permalink
refactor(filemanager): add correct ingest function permissions and re…
Browse files Browse the repository at this point in the history
…name ingest_id tag
  • Loading branch information
mmalenic committed Oct 3, 2024
1 parent 6560b5e commit 3388fdc
Show file tree
Hide file tree
Showing 22 changed files with 118 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table s3_object add column ingest_id uuid;
create index ingest_id_index on s3_object (ingest_id);

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ select
size,
is_delete_marker,
event_type,
move_id,
ingest_id,
attributes,
0::bigint as "number_reordered"
from input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ insert into s3_object (
sequencer,
is_delete_marker,
event_type,
move_id,
ingest_id,
attributes
)
values (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ insert into s3_object (
sequencer,
is_delete_marker,
event_type,
move_id,
ingest_id,
attributes
)
values (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ with input as (
created_sequencer,
is_delete_marker,
event_type,
move_id,
ingest_id,
attributes
)
),
Expand All @@ -56,7 +56,7 @@ current_objects as (
input.storage_class as input_storage_class,
input.is_delete_marker as input_is_delete_marker,
input.event_type as input_event_type,
input.move_id as input_move_id
input.ingest_id as input_ingest_id
from s3_object
-- Grab the relevant values to update with.
join input on
Expand Down Expand Up @@ -105,7 +105,7 @@ update as (
is_delete_marker = objects_to_update.input_is_delete_marker,
storage_class = objects_to_update.input_storage_class,
event_type = objects_to_update.input_event_type,
move_id = objects_to_update.input_move_id,
ingest_id = objects_to_update.input_ingest_id,
number_reordered = s3_object.number_reordered +
-- Note the asymmetry between this and the reorder for deleted query.
case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then
Expand Down Expand Up @@ -133,7 +133,7 @@ select
number_duplicate_events,
size,
is_delete_marker,
move_id,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order created event, so return a created event back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ select
number_duplicate_events,
size,
is_delete_marker,
move_id,
ingest_id,
attributes,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order deleted event, so return a deleted event back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class ApiFunction extends fn.Function {
// See https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/lambda-http#integration-with-api-gateway-stages
// for more info.
AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH: 'true',
...props.environment,
},
...props,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ export class Function extends Construct {
/**
* Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role.
*/
addPoliciesForBuckets(buckets: string[]) {
addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) {
buckets.map((bucket) => {
this.addToPolicy(
new PolicyStatement({
actions: ['s3:ListBucket', 's3:GetObject'],
actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])],
resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`],
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IQueue } from 'aws-cdk-lib/aws-sqs';
import * as fn from './function';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { DatabaseProps } from './function';
import { FILEMANAGER_SERVICE_NAME } from '../../stack';

/**
* Props for controlling access to buckets.
Expand Down Expand Up @@ -35,14 +36,26 @@ export type IngestFunctionProps = fn.FunctionPropsConfigurable & DatabaseProps &
*/
export class IngestFunction extends fn.Function {
constructor(scope: Construct, id: string, props: IngestFunctionProps) {
super(scope, id, { package: 'filemanager-ingest-lambda', ...props });
super(scope, id, {
package: 'filemanager-ingest-lambda',
environment: {
FILEMANAGER_INGESTER_TAG_NAME: 'umccr-org:OrcaBusFileManagerIngestId',
...props.environment,
},
...props,
});

this.addAwsManagedPolicy('service-role/AWSLambdaSQSQueueExecutionRole');

props.eventSources.forEach((source) => {
const eventSource = new SqsEventSource(source);
this.function.addEventSource(eventSource);
});
this.addPoliciesForBuckets(props.buckets);
this.addPoliciesForBuckets(props.buckets, [
's3:GetObjectTagging',
's3:GetObjectVersionTagging',
's3:PutObjectTagging',
's3:PutObjectVersionTagging',
]);
}
}
44 changes: 21 additions & 23 deletions lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
# Tracking moved objects

The filemanager tracks records from S3 events, which do not describe how objects move from one location to another. Using
S3 events alone, it's impossible to tell whether an object that has been deleted in one place and created in another is
S3 events alone, it's not possible to tell whether an object that has been deleted in one place and created in another is
the same object that has been moved, or two different objects. This is because S3 only tracks `Created` or `Deleted`
events.

To track moved objects, the filemanager has to store additional information on objects, that gets copied when the object
is moved. The design involves using object tagging to store an identifier on all objects that is copied when the
object is moved. This id can be used to track how object moves.

When records are ingested, the filemanager first checks if the object contains the tag with the id field. If the tag is
present, then the object has been moved, and the new record reuses that id. If not, a new id is generated and the object
is tagged with it. Later, the database can be queried to find all record matching the id. This represents a sequence of moved
objects.
To track moved objects, the filemanager stores additional information in S3 tags, that gets copied when the object
is moved. This allows the filemanager to track how objects move and also allows it to copy attributes when an object
is moved/copied.

## Tagging process

The process of tagging is:

* When an object record is ingested, the filemanager queries `Created` events for tags.
* If the tags can be retrieved, the filemanager looks for a tag called `filemanager_id`. The key name can be
* If the tags can be retrieved, the filemanager looks for a key called `ingest_id`. The key name can be
configured in the environment variable `FILEMANAGER_INGESTER_TAG_NAME`.
* The tag is parsed as a UUID, and stored in the `move_id` column of `s3_object` for that record.
* The tag is parsed as a UUID, and stored in the `ingest_id` column of `s3_object` for that record.
* If the tag does not exist, then a new UUID is generated, and the object is tagged on S3 by calling `PutObjectTagging`.
The new tag is also stored in the `move_id` column.
* The database is also queried for any records with the same `move_id` so that attributes can be copied to the new record.
The new tag is also stored in the `ingest_id` column.
* The database is also queried for any records with the same `ingest_id` so that attributes can be copied to the new record.

This logic is enabled by default, but it can be switched off by setting `FILEMANAGER_INGESTER_TRACK_MOVES`. The filemanager
API provides a way to query the database for records with a given `move_id`.
API provides a way to query the database for records with a given `ingest_id`.

## Design considerations

Object tags on S3 are limited to 10 tags per object, where each tag can only store 258 unicode characters. This means that it
is not possible a large amount of data or attributes in tags. Instead, filemanager stores a single UUID in the tag, which is
linked to object records that store the attributes and data.
Object tags on S3 are limited to 10 tags per object, and each tag can only store 258 unicode characters. The filemanager
avoids storing a large amount of data by using a UUID as the value of the tag, which is linked to object records that
store attributes and data.

The object tagging process cannot be atomic, so there is a chance for concurrency errors to occur. Tagging can also
fail due to database or network errors. The filemanager only tracks `move_id`s if it knows that a tag has been
successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled then the `move_id`
fail due to database or network errors. The filemanager only tracks `ingest_id`s if it knows that a tag has been
successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled, then the `ingest_id`
column will be null.

The object tagging mechanism also doesn't differentiate between moved objects and copied objects with the same tags.
If an object is copied with tags, the `ingest_id` will also be copied and the above logic will apply.

## Alternative designs

Alternatively, S3 object metadata could also be used to track moves using a similar mechanism. However, metadata can
only be updated by deleting and recreated the object, so tagging was chosen instead. Another mechanism which could track
moved objects is to compare object checksums or etags. This works but may also be limited if checksum is not present, or
if the etag was computed using a different part-size. Both these approaches could be used in addition to object tagging
to provide the filemanager more ways to track moves.
only be updated by deleting and recreated the object. This process would be much more costly so tagging was chosen instead.
Another approach is to compare object checksums or etags. However, this would also be limited if the checksum is not present,
or if the etag was computed using a different part-size. Both these approaches could be used in addition to object tagging
to provide more ways to track moves.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Ingester {
.bind(&events.sequencers)
.bind(&events.is_delete_markers)
.bind(&events.event_types)
.bind(&events.move_ids)
.bind(&events.ingest_ids)
.bind(&events.attributes)
.fetch_all(&mut *tx)
.await?;
Expand Down Expand Up @@ -106,7 +106,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_created(&s3_object_results[0]);
}
Expand Down Expand Up @@ -155,7 +155,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_with(
&s3_object_results[0],
Expand Down Expand Up @@ -194,7 +194,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_eq!(
1,
Expand Down Expand Up @@ -301,7 +301,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 2);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
// Order should be different here.
assert_ingest_events(
Expand Down Expand Up @@ -1098,16 +1098,16 @@ pub(crate) mod tests {
*sha256 = Some(EXPECTED_SHA256.to_string());
});
};
let update_move_ids = |move_ids: &mut Vec<Option<Uuid>>| {
move_ids.iter_mut().for_each(|move_id| {
*move_id = Some(UuidGenerator::generate());
let update_ingest_ids = |ingest_ids: &mut Vec<Option<Uuid>>| {
ingest_ids.iter_mut().for_each(|ingest_id| {
*ingest_id = Some(UuidGenerator::generate());
});
};

update_last_modified(&mut events.last_modified_dates);
update_storage_class(&mut events.storage_classes);
update_sha256(&mut events.sha256s);
update_move_ids(&mut events.move_ids);
update_ingest_ids(&mut events.ingest_ids);

events
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl IngesterPaired {
.bind(&object_created.sequencers)
.bind(&object_created.is_delete_markers)
.bind(vec![Other; object_created.s3_object_ids.len()])
.bind(&object_created.move_ids)
.bind(&object_created.ingest_ids)
.bind(&object_created.attributes)
.fetch_all(&mut *tx)
.await?;
Expand All @@ -126,7 +126,7 @@ impl IngesterPaired {
.bind(&object_created.sequencers)
.bind(&object_created.is_delete_markers)
.bind(vec![Other; object_created.s3_object_ids.len()])
.bind(&object_created.move_ids)
.bind(&object_created.ingest_ids)
.bind(&object_created.attributes)
.fetch_all(&mut *tx)
.await?;
Expand Down Expand Up @@ -219,7 +219,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_created(&s3_object_results[0]);
}
Expand Down Expand Up @@ -273,7 +273,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_with(
&s3_object_results[0],
Expand Down Expand Up @@ -313,7 +313,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID);
}
Expand All @@ -334,7 +334,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_eq!(
2,
Expand Down Expand Up @@ -462,7 +462,7 @@ pub(crate) mod tests {

assert_eq!(s3_object_results.len(), 1);
assert!(s3_object_results[0]
.get::<Option<Uuid>, _>("move_id")
.get::<Option<Uuid>, _>("ingest_id")
.is_some());
assert_eq!(2, s3_object_results[0].get::<i64, _>("number_reordered"));
assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Model {
#[sea_orm(column_type = "Text", nullable)]
pub deleted_sequencer: Option<String>,
pub number_reordered: i64,
pub move_id: Option<Uuid>,
pub ingest_id: Option<Uuid>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Default for Config {
sqs_url: None,
paired_ingest_mode: false,
ingester_track_moves: true,
ingester_tag_name: "filemanager_id".to_string(),
ingester_tag_name: "ingest_id".to_string(),
api_links_url: None,
api_presign_limit: None,
api_presign_expiry: None,
Expand Down
Loading

0 comments on commit 3388fdc

Please sign in to comment.