From 3388fdc33a2f5b7cd11a3ccde5309148a58ae2a3 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 3 Oct 2024 13:51:34 +1000 Subject: [PATCH] refactor(filemanager): add correct ingest function permissions and rename ingest_id tag --- .../database/migrations/0002_s3_ingest_id.sql | 2 + .../database/migrations/0002_s3_move_id.sql | 1 - .../api/select_existing_by_bucket_key.sql | 2 +- .../aws/insert_s3_created_objects.sql | 2 +- .../ingester/aws/insert_s3_objects.sql | 2 +- .../aws/update_reordered_for_created.sql | 8 +-- .../aws/update_reordered_for_deleted.sql | 2 +- .../deploy/constructs/functions/api.ts | 1 + .../deploy/constructs/functions/function.ts | 4 +- .../deploy/constructs/functions/ingest.ts | 17 +++++- .../stacks/filemanager/docs/MOVED_OBJECTS.md | 44 ++++++++------- .../filemanager/src/database/aws/ingester.rs | 18 +++---- .../src/database/aws/ingester_paired.rs | 14 ++--- .../src/database/entities/s3_object.rs | 2 +- .../stacks/filemanager/filemanager/src/env.rs | 2 +- .../filemanager/src/events/aws/collecter.rs | 54 +++++++++---------- .../filemanager/src/events/aws/inventory.rs | 2 +- .../filemanager/src/events/aws/message.rs | 2 +- .../filemanager/src/events/aws/mod.rs | 20 +++---- .../filemanager/src/queries/list.rs | 2 +- .../filemanager/src/queries/mod.rs | 18 +++---- .../filemanager/src/routes/filter/mod.rs | 4 +- 22 files changed, 118 insertions(+), 105 deletions(-) create mode 100644 lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql delete mode 100644 lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_move_id.sql diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql new file mode 100644 index 000000000..b468d1987 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql @@ -0,0 +1,2 @@ +alter table s3_object add column ingest_id uuid; +create index ingest_id_index on s3_object (ingest_id); diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_move_id.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_move_id.sql deleted file mode 100644 index 5c982b3d0..000000000 --- a/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_move_id.sql +++ /dev/null @@ -1 +0,0 @@ -alter table s3_object add column move_id uuid; \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql index 3818b4725..cdee98d76 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql @@ -34,7 +34,7 @@ select size, is_delete_marker, event_type, - move_id, + ingest_id, attributes, 0::bigint as "number_reordered" from input diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index b8bfbdde4..cd02c7b33 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -13,7 +13,7 @@ insert into s3_object ( sequencer, is_delete_marker, event_type, - move_id, + ingest_id, attributes ) values ( diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql index b8bfbdde4..cd02c7b33 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql @@ -13,7 +13,7 @@ insert into s3_object ( sequencer, is_delete_marker, event_type, - move_id, + ingest_id, attributes ) values ( diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql index 793587e76..abe546c19 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql @@ -35,7 +35,7 @@ with input as ( created_sequencer, is_delete_marker, event_type, - move_id, + ingest_id, attributes ) ), @@ -56,7 +56,7 @@ current_objects as ( input.storage_class as input_storage_class, input.is_delete_marker as input_is_delete_marker, input.event_type as input_event_type, - input.move_id as input_move_id + input.ingest_id as input_ingest_id from s3_object -- Grab the relevant values to update with. join input on @@ -105,7 +105,7 @@ update as ( is_delete_marker = objects_to_update.input_is_delete_marker, storage_class = objects_to_update.input_storage_class, event_type = objects_to_update.input_event_type, - move_id = objects_to_update.input_move_id, + ingest_id = objects_to_update.input_ingest_id, number_reordered = s3_object.number_reordered + -- Note the asymmetry between this and the reorder for deleted query. case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then @@ -133,7 +133,7 @@ select number_duplicate_events, size, is_delete_marker, - move_id, + ingest_id, attributes, -- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an -- out of order created event, so return a created event back. diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql index e13e50083..6f435c01c 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql @@ -98,7 +98,7 @@ select number_duplicate_events, size, is_delete_marker, - move_id, + ingest_id, attributes, -- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an -- out of order deleted event, so return a deleted event back. diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts index 2466dd508..58ee7f088 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts @@ -20,6 +20,7 @@ export class ApiFunction extends fn.Function { // See https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/lambda-http#integration-with-api-gateway-stages // for more info. AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH: 'true', + ...props.environment, }, ...props, }); diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts index d4f502751..a186dd8b5 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts @@ -165,11 +165,11 @@ export class Function extends Construct { /** * Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role. */ - addPoliciesForBuckets(buckets: string[]) { + addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) { buckets.map((bucket) => { this.addToPolicy( new PolicyStatement({ - actions: ['s3:ListBucket', 's3:GetObject'], + actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])], resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`], }) ); diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts index 9ff5eece4..14c1f6f9b 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts @@ -3,6 +3,7 @@ import { IQueue } from 'aws-cdk-lib/aws-sqs'; import * as fn from './function'; import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; import { DatabaseProps } from './function'; +import { FILEMANAGER_SERVICE_NAME } from '../../stack'; /** * Props for controlling access to buckets. @@ -35,7 +36,14 @@ export type IngestFunctionProps = fn.FunctionPropsConfigurable & DatabaseProps & */ export class IngestFunction extends fn.Function { constructor(scope: Construct, id: string, props: IngestFunctionProps) { - super(scope, id, { package: 'filemanager-ingest-lambda', ...props }); + super(scope, id, { + package: 'filemanager-ingest-lambda', + environment: { + FILEMANAGER_INGESTER_TAG_NAME: 'umccr-org:OrcaBusFileManagerIngestId', + ...props.environment, + }, + ...props, + }); this.addAwsManagedPolicy('service-role/AWSLambdaSQSQueueExecutionRole'); @@ -43,6 +51,11 @@ export class IngestFunction extends fn.Function { const eventSource = new SqsEventSource(source); this.function.addEventSource(eventSource); }); - this.addPoliciesForBuckets(props.buckets); + this.addPoliciesForBuckets(props.buckets, [ + 's3:GetObjectTagging', + 's3:GetObjectVersionTagging', + 's3:PutObjectTagging', + 's3:PutObjectVersionTagging', + ]); } } diff --git a/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md b/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md index ea4375fa9..4de4ebc13 100644 --- a/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md +++ b/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md @@ -1,49 +1,47 @@ # Tracking moved objects The filemanager tracks records from S3 events, which do not describe how objects move from one location to another. Using -S3 events alone, it's impossible to tell whether an object that has been deleted in one place and created in another is +S3 events alone, it's not possible to tell whether an object that has been deleted in one place and created in another is the same object that has been moved, or two different objects. This is because S3 only tracks `Created` or `Deleted` events. -To track moved objects, the filemanager has to store additional information on objects, that gets copied when the object -is moved. The design involves using object tagging to store an identifier on all objects that is copied when the -object is moved. This id can be used to track how object moves. - -When records are ingested, the filemanager first checks if the object contains the tag with the id field. If the tag is -present, then the object has been moved, and the new record reuses that id. If not, a new id is generated and the object -is tagged with it. Later, the database can be queried to find all record matching the id. This represents a sequence of moved -objects. +To track moved objects, the filemanager stores additional information in S3 tags, that gets copied when the object +is moved. This allows the filemanager to track how objects move and also allows it to copy attributes when an object +is moved/copied. ## Tagging process The process of tagging is: * When an object record is ingested, the filemanager queries `Created` events for tags. -* If the tags can be retrieved, the filemanager looks for a tag called `filemanager_id`. The key name can be +* If the tags can be retrieved, the filemanager looks for a key called `ingest_id`. The key name can be configured in the environment variable `FILEMANAGER_INGESTER_TAG_NAME`. -* The tag is parsed as a UUID, and stored in the `move_id` column of `s3_object` for that record. +* The tag is parsed as a UUID, and stored in the `ingest_id` column of `s3_object` for that record. * If the tag does not exist, then a new UUID is generated, and the object is tagged on S3 by calling `PutObjectTagging`. - The new tag is also stored in the `move_id` column. -* The database is also queried for any records with the same `move_id` so that attributes can be copied to the new record. + The new tag is also stored in the `ingest_id` column. +* The database is also queried for any records with the same `ingest_id` so that attributes can be copied to the new record. This logic is enabled by default, but it can be switched off by setting `FILEMANAGER_INGESTER_TRACK_MOVES`. The filemanager -API provides a way to query the database for records with a given `move_id`. +API provides a way to query the database for records with a given `ingest_id`. ## Design considerations -Object tags on S3 are limited to 10 tags per object, where each tag can only store 258 unicode characters. This means that it -is not possible a large amount of data or attributes in tags. Instead, filemanager stores a single UUID in the tag, which is -linked to object records that store the attributes and data. +Object tags on S3 are limited to 10 tags per object, and each tag can only store 258 unicode characters. The filemanager +avoids storing a large amount of data by using a UUID as the value of the tag, which is linked to object records that +store attributes and data. The object tagging process cannot be atomic, so there is a chance for concurrency errors to occur. Tagging can also -fail due to database or network errors. The filemanager only tracks `move_id`s if it knows that a tag has been -successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled then the `move_id` +fail due to database or network errors. The filemanager only tracks `ingest_id`s if it knows that a tag has been +successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled, then the `ingest_id` column will be null. +The object tagging mechanism also doesn't differentiate between moved objects and copied objects with the same tags. +If an object is copied with tags, the `ingest_id` will also be copied and the above logic will apply. + ## Alternative designs Alternatively, S3 object metadata could also be used to track moves using a similar mechanism. However, metadata can -only be updated by deleting and recreated the object, so tagging was chosen instead. Another mechanism which could track -moved objects is to compare object checksums or etags. This works but may also be limited if checksum is not present, or -if the etag was computed using a different part-size. Both these approaches could be used in addition to object tagging -to provide the filemanager more ways to track moves. +only be updated by deleting and recreated the object. This process would be much more costly so tagging was chosen instead. +Another approach is to compare object checksums or etags. However, this would also be limited if the checksum is not present, +or if the etag was computed using a different part-size. Both these approaches could be used in addition to object tagging +to provide more ways to track moves. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index 7833c2a72..b11a16a9e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -53,7 +53,7 @@ impl Ingester { .bind(&events.sequencers) .bind(&events.is_delete_markers) .bind(&events.event_types) - .bind(&events.move_ids) + .bind(&events.ingest_ids) .bind(&events.attributes) .fetch_all(&mut *tx) .await?; @@ -106,7 +106,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_created(&s3_object_results[0]); } @@ -155,7 +155,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_with( &s3_object_results[0], @@ -194,7 +194,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 2); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_eq!( 1, @@ -301,7 +301,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 2); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); // Order should be different here. assert_ingest_events( @@ -1098,16 +1098,16 @@ pub(crate) mod tests { *sha256 = Some(EXPECTED_SHA256.to_string()); }); }; - let update_move_ids = |move_ids: &mut Vec>| { - move_ids.iter_mut().for_each(|move_id| { - *move_id = Some(UuidGenerator::generate()); + let update_ingest_ids = |ingest_ids: &mut Vec>| { + ingest_ids.iter_mut().for_each(|ingest_id| { + *ingest_id = Some(UuidGenerator::generate()); }); }; update_last_modified(&mut events.last_modified_dates); update_storage_class(&mut events.storage_classes); update_sha256(&mut events.sha256s); - update_move_ids(&mut events.move_ids); + update_ingest_ids(&mut events.ingest_ids); events } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index 648bafedf..eb5caf786 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -104,7 +104,7 @@ impl IngesterPaired { .bind(&object_created.sequencers) .bind(&object_created.is_delete_markers) .bind(vec![Other; object_created.s3_object_ids.len()]) - .bind(&object_created.move_ids) + .bind(&object_created.ingest_ids) .bind(&object_created.attributes) .fetch_all(&mut *tx) .await?; @@ -126,7 +126,7 @@ impl IngesterPaired { .bind(&object_created.sequencers) .bind(&object_created.is_delete_markers) .bind(vec![Other; object_created.s3_object_ids.len()]) - .bind(&object_created.move_ids) + .bind(&object_created.ingest_ids) .bind(&object_created.attributes) .fetch_all(&mut *tx) .await?; @@ -219,7 +219,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_created(&s3_object_results[0]); } @@ -273,7 +273,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_with( &s3_object_results[0], @@ -313,7 +313,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } @@ -334,7 +334,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_eq!( 2, @@ -462,7 +462,7 @@ pub(crate) mod tests { assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); assert_eq!(2, s3_object_results[0].get::("number_reordered")); assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs index d3a9865f9..c978c2cc1 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs @@ -37,7 +37,7 @@ pub struct Model { #[sea_orm(column_type = "Text", nullable)] pub deleted_sequencer: Option, pub number_reordered: i64, - pub move_id: Option, + pub ingest_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs index 6223ec301..4363710f0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -57,7 +57,7 @@ impl Default for Config { sqs_url: None, paired_ingest_mode: false, ingester_track_moves: true, - ingester_tag_name: "filemanager_id".to_string(), + ingester_tag_name: "ingest_id".to_string(), api_links_url: None, api_presign_limit: None, api_presign_expiry: None, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index 29dc0cb70..e1abc169f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -253,7 +253,7 @@ impl<'a> Collecter<'a> { let GetObjectTaggingOutput { mut tag_set, .. } = tagging; - // Check if the object contains the move_id tag. + // Check if the object contains the ingest_id tag. let tag = tag_set .clone() .into_iter() @@ -261,10 +261,10 @@ impl<'a> Collecter<'a> { let Some(tag) = tag else { // If it doesn't, then a new tag needs to be generated. - let move_id = UuidGenerator::generate(); + let ingest_id = UuidGenerator::generate(); let tag = Tag::builder() .key(config.ingester_tag_name()) - .value(move_id) + .value(ingest_id) .build()?; tag_set.push(tag); @@ -280,29 +280,29 @@ impl<'a> Collecter<'a> { warn!("Error received from PutObjectTagging: {}", err); }); - // Only add a move_id to the new record if the tagging was successful. + // Only add a ingest_id to the new record if the tagging was successful. return if result.is_ok() { - Ok(event.with_move_id(Some(move_id))) + Ok(event.with_ingest_id(Some(ingest_id))) } else { Ok(event) }; }; - // The object has a move_id tag. Grab the existing the tag, returning a new record without - // the move_id if the is not valid. - let move_id = Uuid::from_str(tag.value()).inspect_err(|err| { - warn!("Failed to parse move_id from tag: {}", err); + // The object has a ingest_id tag. Grab the existing the tag, returning a new record without + // the ingest_id if the is not valid. + let ingest_id = Uuid::from_str(tag.value()).inspect_err(|err| { + warn!("Failed to parse ingest_id from tag: {}", err); }); - let Ok(move_id) = move_id else { + let Ok(ingest_id) = ingest_id else { return Ok(event); }; // From here, the new record must be a valid, moved object. - let event = event.with_move_id(Some(move_id)); + let event = event.with_ingest_id(Some(ingest_id)); // Get the attributes from the old record to update the new record with. let filter = S3ObjectsFilter { - move_id: Some(move_id), + ingest_id: Some(ingest_id), ..Default::default() }; let moved_object = ListQueryBuilder::new(database_client.connection_ref()) @@ -317,7 +317,7 @@ impl<'a> Collecter<'a> { if let Some(moved_object) = moved_object { Ok(event.with_attributes(moved_object.attributes)) } else { - warn!("Object with move_id {} not found in database", move_id); + warn!("Object with ingest_id {} not found in database", ingest_id); Ok(event) } } @@ -564,14 +564,14 @@ pub(crate) mod tests { let EventSourceType::S3(events) = &mut result.event_type else { panic!(); }; - assert!(events.move_ids[0].is_some()); + assert!(events.ingest_ids[0].is_some()); client.ingest(result.event_type).await.unwrap(); let s3_object_results = s3_object_results(&pool).await; assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_some()); } @@ -581,9 +581,9 @@ pub(crate) mod tests { let client = Client::from_pool(pool.clone()); let mut collecter = test_collecter(&config, &client).await; - let move_id = UuidGenerator::generate(); + let ingest_id = UuidGenerator::generate(); EntriesBuilder::default() - .with_move_id(move_id) + .with_ingest_id(ingest_id) .with_n(1) .build(&client) .await; @@ -600,8 +600,8 @@ pub(crate) mod tests { vec![move || { Ok(GetObjectTaggingOutput::builder() .set_tag_set(Some(vec![Tag::builder() - .key("filemanager_id") - .value(move_id.to_string()) + .key("ingest_id") + .value(ingest_id.to_string()) .build() .unwrap()])) .build() @@ -613,19 +613,19 @@ pub(crate) mod tests { let EventSourceType::S3(events) = &mut result.event_type else { panic!(); }; - assert!(events.move_ids[0].is_some()); + assert!(events.ingest_ids[0].is_some()); client.ingest(result.event_type).await.unwrap(); let s3_object_results = s3_object_results(&pool).await; assert_eq!(s3_object_results.len(), 2); assert_eq!( - s3_object_results[0].get::, _>("move_id"), - Some(move_id) + s3_object_results[0].get::, _>("ingest_id"), + Some(ingest_id) ); assert_eq!( - s3_object_results[1].get::, _>("move_id"), - Some(move_id) + s3_object_results[1].get::, _>("ingest_id"), + Some(ingest_id) ); let expected_attributes = json!({ @@ -673,14 +673,14 @@ pub(crate) mod tests { let EventSourceType::S3(events) = &mut result.event_type else { panic!(); }; - assert!(events.move_ids[0].is_none()); + assert!(events.ingest_ids[0].is_none()); client.ingest(result.event_type).await.unwrap(); let s3_object_results = s3_object_results(&pool).await; assert_eq!(s3_object_results.len(), 1); assert!(s3_object_results[0] - .get::, _>("move_id") + .get::, _>("ingest_id") .is_none()); } @@ -761,7 +761,7 @@ pub(crate) mod tests { .with( eq("key"), eq("bucket"), - function(|t: &Tagging| t.tag_set().first().unwrap().key == "filemanager_id"), + function(|t: &Tagging| t.tag_set().first().unwrap().key == "ingest_id"), ) .times(put_tagging_expectations.len()); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs index 584468ae5..8d2aee699 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs @@ -528,7 +528,7 @@ impl From for FlatS3EventMessage { // Anything in an inventory report is always a created event. event_type: Created, is_delete_marker: is_delete_marker.unwrap_or_default(), - move_id: None, + ingest_id: None, attributes: None, number_duplicate_events: 0, number_reordered: 0, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs index 966821aee..771dd1bc0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs @@ -191,7 +191,7 @@ impl From for FlatS3EventMessage { sha256: None, event_type, is_delete_marker, - move_id: None, + ingest_id: None, attributes: None, number_duplicate_events: 0, number_reordered: 0, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs index 268349d96..4a6595cad 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs @@ -73,7 +73,7 @@ pub struct TransposedS3EventMessages { pub last_modified_dates: Vec>>, pub event_types: Vec, pub is_delete_markers: Vec, - pub move_ids: Vec>, + pub ingest_ids: Vec>, pub attributes: Vec>, } @@ -95,7 +95,7 @@ impl TransposedS3EventMessages { last_modified_dates: Vec::with_capacity(capacity), event_types: Vec::with_capacity(capacity), is_delete_markers: Vec::with_capacity(capacity), - move_ids: Vec::with_capacity(capacity), + ingest_ids: Vec::with_capacity(capacity), attributes: Vec::with_capacity(capacity), } } @@ -116,7 +116,7 @@ impl TransposedS3EventMessages { last_modified_date, event_type, is_delete_marker, - move_id, + ingest_id, attributes, .. } = message; @@ -134,7 +134,7 @@ impl TransposedS3EventMessages { self.last_modified_dates.push(last_modified_date); self.event_types.push(event_type); self.is_delete_markers.push(is_delete_marker); - self.move_ids.push(move_id); + self.ingest_ids.push(ingest_id); self.attributes.push(attributes); } } @@ -173,7 +173,7 @@ impl From for FlatS3EventMessages { messages.last_modified_dates, messages.event_types, messages.is_delete_markers, - messages.move_ids, + messages.ingest_ids, messages.attributes, ) .map( @@ -191,7 +191,7 @@ impl From for FlatS3EventMessages { last_modified_date, event_type, is_delete_marker, - move_id, + ingest_id, attributes, )| { FlatS3EventMessage { @@ -208,7 +208,7 @@ impl From for FlatS3EventMessages { event_time, event_type, is_delete_marker, - move_id, + ingest_id, attributes, number_duplicate_events: 0, number_reordered: 0, @@ -442,7 +442,7 @@ pub struct FlatS3EventMessage { pub event_time: Option>, pub event_type: EventType, pub is_delete_marker: bool, - pub move_id: Option, + pub ingest_id: Option, pub attributes: Option, pub number_duplicate_events: i64, pub number_reordered: i64, @@ -577,8 +577,8 @@ impl FlatS3EventMessage { } /// Set the move id. - pub fn with_move_id(mut self, move_id: Option) -> Self { - self.move_id = move_id; + pub fn with_ingest_id(mut self, ingest_id: Option) -> Self { + self.ingest_id = ingest_id; self } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index 7b2d93878..4e05f03a7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -150,7 +150,7 @@ where .is_delete_marker .map(|v| s3_object::Column::IsDeleteMarker.eq(v)), ) - .add_option(filter.move_id.map(|v| s3_object::Column::MoveId.eq(v))); + .add_option(filter.ingest_id.map(|v| s3_object::Column::IngestId.eq(v))); if let Some(attributes) = filter.attributes { let json_conditions = Self::json_conditions( diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index 0244a0531..a5749a739 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -41,12 +41,12 @@ impl Entries { shuffle: bool, bucket_divisor: usize, key_divisor: usize, - move_id: Option, + ingest_id: Option, ) -> Vec { let mut output = vec![]; let mut entries: Vec<_> = (0..n) - .map(|index| Self::generate_entry(index, bucket_divisor, key_divisor, move_id)) + .map(|index| Self::generate_entry(index, bucket_divisor, key_divisor, ingest_id)) .collect(); if shuffle { @@ -71,7 +71,7 @@ impl Entries { index: usize, bucket_divisor: usize, key_divisor: usize, - move_id: Option, + ingest_id: Option, ) -> ActiveS3Object { let event = Self::event_type(index); let date = || Set(Some(DateTime::default().add(Days::new(index as u64)))); @@ -84,7 +84,7 @@ impl Entries { ActiveS3Object { s3_object_id: Set(UuidGenerator::generate()), - move_id: Set(Some(move_id.unwrap_or_else(UuidGenerator::generate))), + ingest_id: Set(Some(ingest_id.unwrap_or_else(UuidGenerator::generate))), event_type: Set(event.clone()), bucket: Set((index / bucket_divisor).to_string()), key: Set((index / key_divisor).to_string()), @@ -129,7 +129,7 @@ pub struct EntriesBuilder { bucket_divisor: usize, key_divisor: usize, shuffle: bool, - move_id: Option, + ingest_id: Option, } impl EntriesBuilder { @@ -158,8 +158,8 @@ impl EntriesBuilder { } /// Set whether to shuffle. - pub fn with_move_id(mut self, move_id: Uuid) -> Self { - self.move_id = Some(move_id); + pub fn with_ingest_id(mut self, ingest_id: Uuid) -> Self { + self.ingest_id = Some(ingest_id); self } @@ -171,7 +171,7 @@ impl EntriesBuilder { self.shuffle, self.bucket_divisor, self.key_divisor, - self.move_id, + self.ingest_id, ) .await; @@ -189,7 +189,7 @@ impl Default for EntriesBuilder { bucket_divisor: 2, key_divisor: 1, shuffle: false, - move_id: None, + ingest_id: None, } } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs index e3782d345..af96c854b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs @@ -68,8 +68,8 @@ pub struct S3ObjectsFilter { /// Query by the object delete marker. pub(crate) is_delete_marker: Option, #[param(required = false)] - /// Query by the object delete marker. - pub(crate) move_id: Option, + /// Query by the ingest id that objects get tagged with. + pub(crate) ingest_id: Option, #[param(required = false)] /// Query by JSON attributes. Supports nested syntax to access inner /// fields, e.g. `attributes[attribute_id]=...`. This only deserializes