diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql new file mode 100644 index 000000000..b468d1987 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0002_s3_ingest_id.sql @@ -0,0 +1,2 @@ +alter table s3_object add column ingest_id uuid; +create index ingest_id_index on s3_object (ingest_id); diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql index 16e5909a7..cdee98d76 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql @@ -34,6 +34,8 @@ select size, is_delete_marker, event_type, + ingest_id, + attributes, 0::bigint as "number_reordered" from input -- Grab the most recent object in each input group. diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index a182e1bef..cd02c7b33 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -12,7 +12,9 @@ insert into s3_object ( version_id, sequencer, is_delete_marker, - event_type + event_type, + ingest_id, + attributes ) values ( unnest($1::uuid[]), @@ -27,7 +29,9 @@ values ( unnest($10::text[]), unnest($11::text[]), unnest($12::boolean[]), - unnest($13::event_type[]) + unnest($13::event_type[]), + unnest($14::uuid[]), + unnest($15::jsonb[]) ) on conflict on constraint sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 returning s3_object_id, number_duplicate_events; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql index a182e1bef..cd02c7b33 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql @@ -12,7 +12,9 @@ insert into s3_object ( version_id, sequencer, is_delete_marker, - event_type + event_type, + ingest_id, + attributes ) values ( unnest($1::uuid[]), @@ -27,7 +29,9 @@ values ( unnest($10::text[]), unnest($11::text[]), unnest($12::boolean[]), - unnest($13::event_type[]) + unnest($13::event_type[]), + unnest($14::uuid[]), + unnest($15::jsonb[]) ) on conflict on constraint sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 returning s3_object_id, number_duplicate_events; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql index f136c4731..abe546c19 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql @@ -18,7 +18,9 @@ with input as ( $10::text[], $11::text[], $12::boolean[], - $13::event_type[] + $13::event_type[], + $14::uuid[], + $15::jsonb[] ) as input ( s3_object_id, bucket, @@ -32,7 +34,9 @@ with input as ( version_id, created_sequencer, is_delete_marker, - event_type + event_type, + ingest_id, + attributes ) ), -- Then, select the objects that need to be updated. @@ -51,7 +55,8 @@ current_objects as ( input.e_tag as input_e_tag, input.storage_class as input_storage_class, input.is_delete_marker as input_is_delete_marker, - input.event_type as input_event_type + input.event_type as input_event_type, + input.ingest_id as input_ingest_id from s3_object -- Grab the relevant values to update with. join input on @@ -100,6 +105,7 @@ update as ( is_delete_marker = objects_to_update.input_is_delete_marker, storage_class = objects_to_update.input_storage_class, event_type = objects_to_update.input_event_type, + ingest_id = objects_to_update.input_ingest_id, number_reordered = s3_object.number_reordered + -- Note the asymmetry between this and the reorder for deleted query. case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then @@ -127,6 +133,8 @@ select number_duplicate_events, size, is_delete_marker, + ingest_id, + attributes, -- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an -- out of order created event, so return a created event back. 'Created'::event_type as "event_type" diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql index 41f720502..6f435c01c 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql @@ -98,6 +98,8 @@ select number_duplicate_events, size, is_delete_marker, + ingest_id, + attributes, -- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an -- out of order deleted event, so return a deleted event back. 'Deleted'::event_type as "event_type" diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts index 2466dd508..58ee7f088 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts @@ -20,6 +20,7 @@ export class ApiFunction extends fn.Function { // See https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/lambda-http#integration-with-api-gateway-stages // for more info. AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH: 'true', + ...props.environment, }, ...props, }); diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts index d4f502751..a186dd8b5 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts @@ -165,11 +165,11 @@ export class Function extends Construct { /** * Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role. */ - addPoliciesForBuckets(buckets: string[]) { + addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) { buckets.map((bucket) => { this.addToPolicy( new PolicyStatement({ - actions: ['s3:ListBucket', 's3:GetObject'], + actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])], resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`], }) ); diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts index 9ff5eece4..14c1f6f9b 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts @@ -3,6 +3,7 @@ import { IQueue } from 'aws-cdk-lib/aws-sqs'; import * as fn from './function'; import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; import { DatabaseProps } from './function'; +import { FILEMANAGER_SERVICE_NAME } from '../../stack'; /** * Props for controlling access to buckets. @@ -35,7 +36,14 @@ export type IngestFunctionProps = fn.FunctionPropsConfigurable & DatabaseProps & */ export class IngestFunction extends fn.Function { constructor(scope: Construct, id: string, props: IngestFunctionProps) { - super(scope, id, { package: 'filemanager-ingest-lambda', ...props }); + super(scope, id, { + package: 'filemanager-ingest-lambda', + environment: { + FILEMANAGER_INGESTER_TAG_NAME: 'umccr-org:OrcaBusFileManagerIngestId', + ...props.environment, + }, + ...props, + }); this.addAwsManagedPolicy('service-role/AWSLambdaSQSQueueExecutionRole'); @@ -43,6 +51,11 @@ export class IngestFunction extends fn.Function { const eventSource = new SqsEventSource(source); this.function.addEventSource(eventSource); }); - this.addPoliciesForBuckets(props.buckets); + this.addPoliciesForBuckets(props.buckets, [ + 's3:GetObjectTagging', + 's3:GetObjectVersionTagging', + 's3:PutObjectTagging', + 's3:PutObjectVersionTagging', + ]); } } diff --git a/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md b/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md new file mode 100644 index 000000000..3a656fcf3 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/docs/MOVED_OBJECTS.md @@ -0,0 +1,58 @@ +# Tracking moved objects + +The filemanager tracks records from S3 events, which do not describe how objects move from one location to another. Using +S3 events alone, it's not possible to tell whether an object that has been deleted in one place and created in another is +the same object that has been moved, or two different objects. This is because S3 only tracks `Created` or `Deleted` +events. + +To track moved objects, filemanager stores additional information in S3 tags. The tag gets updated when the object +is moved. This allows the filemanager to track how objects move and also allows it to copy attributes when an object +is moved/copied. This process is described [below](#tagging-process). + +## Tagging process + +The process of tagging is: + +* When an object record is ingested, the filemanager queries `Created` events for tags. +* If the tags can be retrieved, the filemanager looks for a key called `ingest_id`. The key name can be + configured in the environment variable `FILEMANAGER_INGESTER_TAG_NAME`. +* The tag is parsed as a UUID, and stored in the `ingest_id` column of `s3_object` for that record. +* If the tag does not exist, then a new UUID is generated, and the object is tagged on S3 by calling `PutObjectTagging`. + The new tag is also stored in the `ingest_id` column. +* The database is also queried for any records with the same `ingest_id` so that attributes can be copied to the new record. + +This logic is enabled by default, but it can be switched off by setting `FILEMANAGER_INGESTER_TRACK_MOVES`. The filemanager +API provides a way to query the database for records with a given `ingest_id`. + +## Design considerations + +Object tags on S3 are [limited][s3-tagging] to 10 tags per object, and each tag can only store 258 unicode characters. +The filemanager avoids storing a large amount of data by using a UUID as the value of the tag, which is linked to object +records that store attributes and data. + +The object tagging process cannot be atomic, so there is a chance for concurrency errors to occur. Tagging can also +fail due to database or network errors. The filemanager only tracks `ingest_id`s if it knows that a tag has been +successfully created on S3, and successfully stored in the database. If tagging fails, or it's not enabled, then the `ingest_id` +column will be null. + +The act of tagging the object allows it to be tracked - ideally this is done as soon as possible. Currently, this happens +at ingestion, however this could have performance implications. Alternative approaches should consider asynchronous tagging. +For example, [`s3:ObjectTagging:*`][s3-tagging-event] events could be used for this purpose. + +The object tagging mechanism also doesn't differentiate between moved objects and copied objects with the same tags. +If an object is copied with tags, the `ingest_id` will also be copied and the above logic will apply. + +## Alternative designs + +Alternatively, S3 object metadata could also be used to track moves using a similar mechanism. However, metadata can +[only be updated][s3-metadata] by deleting and recreated the object. This process would be much more costly so tagging +was chosen instead. + +Another approach is to compare object checksums or etags. However, this would also be limited if the checksum is not present, +or if the etag was computed using a different part-size. It also fails if the checksum is not expected to be the same, +for example, if tracking set of files that are compressed. Both these approaches could be used in addition to object tagging +to provide more ways to track moves. + +[s3-tagging]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html +[s3-tagging-event]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types +[s3-metadata]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs index 30a26b833..6e1ff8f05 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs @@ -6,10 +6,13 @@ use std::result; use aws_sdk_s3 as s3; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; +use aws_sdk_s3::operation::get_object_tagging::{GetObjectTaggingError, GetObjectTaggingOutput}; use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput}; use aws_sdk_s3::operation::list_buckets::{ListBucketsError, ListBucketsOutput}; +use aws_sdk_s3::operation::put_object_tagging::{PutObjectTaggingError, PutObjectTaggingOutput}; use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; use aws_sdk_s3::types::ChecksumMode::Enabled; +use aws_sdk_s3::types::Tagging; use chrono::Duration; use mockall::automock; @@ -70,6 +73,36 @@ impl Client { .await } + /// Execute the `GetObjectTagging` operation. + pub async fn get_object_tagging( + &self, + key: &str, + bucket: &str, + ) -> Result { + self.inner + .get_object_tagging() + .key(key) + .bucket(bucket) + .send() + .await + } + + /// Execute the `PutObjectTagging` operation. + pub async fn put_object_tagging( + &self, + key: &str, + bucket: &str, + tagging: Tagging, + ) -> Result { + self.inner + .put_object_tagging() + .key(key) + .bucket(bucket) + .tagging(tagging) + .send() + .await + } + /// Execute the `GetObject` operation and generate a presigned url for the object. pub async fn presign_url( &self, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index ed784d1d0..b11a16a9e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -53,6 +53,8 @@ impl Ingester { .bind(&events.sequencers) .bind(&events.is_delete_markers) .bind(&events.event_types) + .bind(&events.ingest_ids) + .bind(&events.attributes) .fetch_all(&mut *tx) .await?; @@ -74,6 +76,7 @@ pub(crate) mod tests { use sqlx::postgres::PgRow; use sqlx::{Executor, PgPool, Row}; use tokio::time::Instant; + use uuid::Uuid; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; @@ -90,6 +93,7 @@ pub(crate) mod tests { }; use crate::events::EventSourceType; use crate::events::EventSourceType::S3; + use crate::uuid::UuidGenerator; #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_object_created(pool: PgPool) { @@ -101,6 +105,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_created(&s3_object_results[0]); } @@ -147,6 +154,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_with( &s3_object_results[0], Some(i64::MAX), @@ -183,6 +193,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 2); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_eq!( 1, s3_object_results[0].get::("number_duplicate_events") @@ -287,6 +300,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 2); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); // Order should be different here. assert_ingest_events( &s3_object_results[1], @@ -1064,7 +1080,9 @@ pub(crate) mod tests { assert_row(s3_object_results, message, sequencer, event_time); } - fn update_test_events(mut events: TransposedS3EventMessages) -> TransposedS3EventMessages { + pub(crate) fn update_test_events( + mut events: TransposedS3EventMessages, + ) -> TransposedS3EventMessages { let update_last_modified = |dates: &mut Vec>>| { dates.iter_mut().for_each(|last_modified| { *last_modified = Some(DateTime::default()); @@ -1080,10 +1098,16 @@ pub(crate) mod tests { *sha256 = Some(EXPECTED_SHA256.to_string()); }); }; + let update_ingest_ids = |ingest_ids: &mut Vec>| { + ingest_ids.iter_mut().for_each(|ingest_id| { + *ingest_id = Some(UuidGenerator::generate()); + }); + }; update_last_modified(&mut events.last_modified_dates); update_storage_class(&mut events.storage_classes); update_sha256(&mut events.sha256s); + update_ingest_ids(&mut events.ingest_ids); events } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index aaab78780..eb5caf786 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -104,6 +104,8 @@ impl IngesterPaired { .bind(&object_created.sequencers) .bind(&object_created.is_delete_markers) .bind(vec![Other; object_created.s3_object_ids.len()]) + .bind(&object_created.ingest_ids) + .bind(&object_created.attributes) .fetch_all(&mut *tx) .await?; @@ -124,6 +126,8 @@ impl IngesterPaired { .bind(&object_created.sequencers) .bind(&object_created.is_delete_markers) .bind(vec![Other; object_created.s3_object_ids.len()]) + .bind(&object_created.ingest_ids) + .bind(&object_created.attributes) .fetch_all(&mut *tx) .await?; @@ -179,13 +183,8 @@ impl IngesterPaired { pub(crate) mod tests { use std::ops::Add; - use chrono::{DateTime, Utc}; - use itertools::Itertools; - use sqlx::postgres::PgRow; - use sqlx::{Executor, PgPool, Row}; - use tokio::time::Instant; - use crate::database::aws::ingester::tests::fetch_results; + use crate::database::aws::ingester::tests::update_test_events; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::{Client, Ingest}; use crate::events::aws::message::default_version_id; @@ -199,9 +198,15 @@ pub(crate) mod tests { EXPECTED_SEQUENCER_DELETED_ONE, EXPECTED_SEQUENCER_DELETED_TWO, EXPECTED_SHA256, EXPECTED_VERSION_ID, }; - use crate::events::aws::{Events, FlatS3EventMessage, FlatS3EventMessages, StorageClass}; + use crate::events::aws::{Events, FlatS3EventMessage, FlatS3EventMessages}; use crate::events::EventSourceType; use crate::events::EventSourceType::S3Paired; + use chrono::{DateTime, Utc}; + use itertools::Itertools; + use sqlx::postgres::PgRow; + use sqlx::{Executor, PgPool, Row}; + use tokio::time::Instant; + use uuid::Uuid; #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_object_created(pool: PgPool) { @@ -213,6 +218,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_created(&s3_object_results[0]); } @@ -264,6 +272,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_with( &s3_object_results[0], Some(i64::MAX), @@ -301,6 +312,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } @@ -319,6 +333,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_eq!( 2, s3_object_results[0].get::("number_duplicate_events") @@ -444,6 +461,9 @@ pub(crate) mod tests { let s3_object_results = fetch_results(&ingester).await; assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); assert_eq!(2, s3_object_results[0].get::("number_reordered")); assert_ingest_events(&s3_object_results[0], EXPECTED_VERSION_ID); } @@ -512,7 +532,7 @@ pub(crate) mod tests { let mut events = vec![event]; events.extend(expected_flat_events_simple().sort_and_dedup().into_inner()); - let events = update_test_events(FlatS3EventMessages(events).into()); + let events = update_test_events_paired(FlatS3EventMessages(events).into()); // This also checks to make sure that the update duplicate constraint succeeds. ingester .ingest(EventSourceType::S3Paired(events)) @@ -682,7 +702,7 @@ pub(crate) mod tests { let mut events = vec![event]; events.extend(expected_flat_events_simple().sort_and_dedup().into_inner()); - let events = update_test_events(FlatS3EventMessages(events).into()); + let events = update_test_events_paired(FlatS3EventMessages(events).into()); ingester .ingest(EventSourceType::S3Paired(events)) @@ -1613,30 +1633,9 @@ pub(crate) mod tests { ); } - fn update_test_events(mut events: Events) -> Events { - let update_last_modified = |dates: &mut Vec>>| { - dates.iter_mut().for_each(|last_modified| { - *last_modified = Some(DateTime::default()); - }); - }; - let update_storage_class = |classes: &mut Vec>| { - classes.iter_mut().for_each(|storage_class| { - *storage_class = Some(StorageClass::Standard); - }); - }; - let update_sha256 = |sha256s: &mut Vec>| { - sha256s.iter_mut().for_each(|sha256| { - *sha256 = Some(EXPECTED_SHA256.to_string()); - }); - }; - - update_last_modified(&mut events.object_created.last_modified_dates); - update_storage_class(&mut events.object_created.storage_classes); - update_sha256(&mut events.object_created.sha256s); - - update_last_modified(&mut events.object_deleted.last_modified_dates); - update_storage_class(&mut events.object_deleted.storage_classes); - update_sha256(&mut events.object_deleted.sha256s); + fn update_test_events_paired(mut events: Events) -> Events { + events.object_created = update_test_events(events.object_created); + events.object_deleted = update_test_events(events.object_deleted); events } @@ -1655,11 +1654,11 @@ pub(crate) mod tests { } pub(crate) fn test_events() -> Events { - update_test_events(expected_events_simple()) + update_test_events_paired(expected_events_simple()) } pub(crate) fn test_events_delete_marker() -> Events { - update_test_events(expected_events_simple_delete_marker()) + update_test_events_paired(expected_events_simple_delete_marker()) } pub(crate) fn test_created_events() -> Events { diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs index a914a730a..c978c2cc1 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/entities/s3_object.rs @@ -37,6 +37,7 @@ pub struct Model { #[sea_orm(column_type = "Text", nullable)] pub deleted_sequencer: Option, pub number_reordered: i64, + pub ingest_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index 037c770e9..48fcee8e1 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -152,6 +152,7 @@ pub trait Migrate { #[cfg(test)] pub(crate) mod tests { use chrono::{DateTime, Utc}; + use sea_orm::prelude::Json; use sqlx::{query, PgPool, Row}; use crate::database::aws::migration::tests::MIGRATOR; @@ -240,6 +241,8 @@ pub(crate) mod tests { .bind(vec![EXPECTED_SEQUENCER_CREATED_ONE.to_string()]) .bind(vec![false]) .bind(vec![event_type]) + .bind(vec![UuidGenerator::generate()]) + .bind(vec![None::]) .fetch_all(pool) .await .unwrap(); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs index 9e1591943..4363710f0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -16,6 +16,7 @@ use crate::error::Result; /// Configuration environment variables for filemanager. #[serde_as] #[derive(Debug, Clone, Deserialize, Eq, PartialEq)] +#[serde(default)] pub struct Config { pub(crate) database_url: Option, pub(crate) pgpassword: Option, @@ -24,8 +25,12 @@ pub struct Config { pub(crate) pguser: Option, #[serde(rename = "filemanager_sqs_url")] pub(crate) sqs_url: Option, - #[serde(default, rename = "filemanager_paired_ingest_mode")] + #[serde(rename = "filemanager_paired_ingest_mode")] pub(crate) paired_ingest_mode: bool, + #[serde(rename = "filemanager_ingester_track_moves")] + pub(crate) ingester_track_moves: bool, + #[serde(rename = "filemanager_ingester_tag_name")] + pub(crate) ingester_tag_name: String, #[serde(default, rename = "filemanager_api_links_url")] pub(crate) api_links_url: Option, #[serde(rename = "filemanager_api_presign_limit")] @@ -35,15 +40,9 @@ pub struct Config { pub(crate) api_presign_expiry: Option, #[serde(rename = "filemanager_api_cors_allow_origins")] pub(crate) api_cors_allow_origins: Option>, - #[serde( - default = "default_allow_methods", - rename = "filemanager_api_cors_allow_methods" - )] + #[serde(rename = "filemanager_api_cors_allow_methods")] pub(crate) api_cors_allow_methods: Vec, - #[serde( - default = "default_allow_headers", - rename = "filemanager_api_cors_allow_headers" - )] + #[serde(rename = "filemanager_api_cors_allow_headers")] pub(crate) api_cors_allow_headers: Vec, } @@ -57,30 +56,24 @@ impl Default for Config { pguser: None, sqs_url: None, paired_ingest_mode: false, + ingester_track_moves: true, + ingester_tag_name: "ingest_id".to_string(), api_links_url: None, api_presign_limit: None, api_presign_expiry: None, api_cors_allow_origins: None, - api_cors_allow_methods: default_allow_methods(), - api_cors_allow_headers: default_allow_headers(), + api_cors_allow_methods: vec![ + Method::GET.to_string(), + Method::HEAD.to_string(), + Method::OPTIONS.to_string(), + Method::POST.to_string(), + Method::PATCH.to_string(), + ], + api_cors_allow_headers: vec![AUTHORIZATION.to_string()], } } } -fn default_allow_methods() -> Vec { - vec![ - Method::GET.to_string(), - Method::HEAD.to_string(), - Method::OPTIONS.to_string(), - Method::POST.to_string(), - Method::PATCH.to_string(), - ] -} - -fn default_allow_headers() -> Vec { - vec![AUTHORIZATION.to_string()] -} - impl Config { /// Load environment variables into a `Config` struct. pub fn load() -> Result { @@ -133,6 +126,16 @@ impl Config { self.paired_ingest_mode } + /// Whether the ingester should track moves. + pub fn ingester_track_moves(&self) -> bool { + self.ingester_track_moves + } + + /// Get the ingester tag name. + pub fn ingester_tag_name(&self) -> &str { + &self.ingester_tag_name + } + /// Get the presigned size limit. pub fn api_links_url(&self) -> Option<&Url> { self.api_links_url.as_ref() @@ -192,6 +195,8 @@ mod tests { ("PGUSER", "user"), ("FILEMANAGER_SQS_URL", "url"), ("FILEMANAGER_PAIRED_INGEST_MODE", "true"), + ("FILEMANAGER_INGESTER_TRACK_MOVES", "false"), + ("FILEMANAGER_INGESTER_TAG_NAME", "tag"), ("FILEMANAGER_API_LINKS_URL", "https://localhost:8000"), ("FILEMANAGER_API_PRESIGN_LIMIT", "123"), ("FILEMANAGER_API_PRESIGN_EXPIRY", "60"), @@ -217,6 +222,8 @@ mod tests { pguser: Some("user".to_string()), sqs_url: Some("url".to_string()), paired_ingest_mode: true, + ingester_track_moves: false, + ingester_tag_name: "tag".to_string(), api_links_url: Some("https://localhost:8000".parse().unwrap()), api_presign_limit: Some(123), api_presign_expiry: Some(Duration::seconds(60)), diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs index 4a1f47082..7b16d71cd 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs @@ -4,7 +4,6 @@ use std::{io, result}; use sea_orm::{DbErr, RuntimeErr}; -use sqlx::migrate::MigrateError; use thiserror::Error; use url::ParseError; use uuid::Uuid; @@ -16,8 +15,6 @@ pub type Result = result::Result; pub enum Error { #[error("database error: `{0}`")] DatabaseError(DbErr), - #[error("SQL migrate error: `{0}`")] - MigrateError(String), #[error("SQS error: `{0}`")] SQSError(String), #[error("serde error: `{0}`")] @@ -26,8 +23,8 @@ pub enum Error { ConfigError(String), #[error("credential generator error: `{0}`")] CredentialGeneratorError(String), - #[error("S3 inventory error: `{0}`")] - S3InventoryError(String), + #[error("S3 error: `{0}`")] + S3Error(String), #[error("{0}")] IoError(#[from] io::Error), #[error("numerical operation overflowed")] @@ -48,6 +45,9 @@ pub enum Error { PresignedUrlError(String), #[error("configuring API: `{0}`")] ApiConfigurationError(String), + #[cfg(feature = "migrate")] + #[error("SQL migrate error: `{0}`")] + MigrateError(String), } impl From for Error { @@ -62,12 +62,6 @@ impl From for Error { } } -impl From for Error { - fn from(err: MigrateError) -> Self { - Self::DatabaseError(DbErr::Migration(err.to_string())) - } -} - impl From for Error { fn from(err: serde_json::Error) -> Self { Self::SerdeError(err.to_string()) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index ce560e57b..e1abc169f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -1,29 +1,35 @@ //! Definition and implementation of the aws Collecter. //! -use async_trait::async_trait; -use aws_sdk_s3::operation::head_object::HeadObjectOutput; -use aws_sdk_s3::primitives; -use aws_sdk_s3::types::StorageClass::Standard; -use chrono::{DateTime, Utc}; -use futures::future::join_all; -use futures::TryFutureExt; -use mockall_double::double; -use tracing::{trace, warn}; - -#[double] -use crate::clients::aws::s3::Client; #[double] use crate::clients::aws::s3::Client as S3Client; #[double] use crate::clients::aws::sqs::Client as SQSClient; +use crate::database; use crate::env::Config; -use crate::error::Error::{SQSError, SerdeError}; -use crate::error::Result; +use crate::error::Error::{S3Error, SQSError, SerdeError}; +use crate::error::{Error, Result}; use crate::events::aws::{ EventType, FlatS3EventMessage, FlatS3EventMessages, StorageClass, TransposedS3EventMessages, }; use crate::events::{Collect, EventSource, EventSourceType}; +use crate::queries::list::ListQueryBuilder; +use crate::routes::filter::S3ObjectsFilter; +use crate::uuid::UuidGenerator; +use async_trait::async_trait; +use aws_sdk_s3::error::BuildError; +use aws_sdk_s3::operation::get_object_tagging::GetObjectTaggingOutput; +use aws_sdk_s3::operation::head_object::HeadObjectOutput; +use aws_sdk_s3::primitives; +use aws_sdk_s3::types::StorageClass::Standard; +use aws_sdk_s3::types::{Tag, Tagging}; +use chrono::{DateTime, Utc}; +use futures::future::join_all; +use futures::TryFutureExt; +use mockall_double::double; +use std::str::FromStr; +use tracing::{trace, warn}; +use uuid::Uuid; /// Build an AWS collector struct. #[derive(Default, Debug)] @@ -58,11 +64,16 @@ impl CollecterBuilder { } /// Build a collector using the raw events. - pub async fn build(self, raw_events: FlatS3EventMessages, config: &Config) -> Collecter<'_> { + pub async fn build<'a>( + self, + raw_events: FlatS3EventMessages, + config: &'a Config, + client: &'a database::Client, + ) -> Collecter<'a> { if let Some(s3_client) = self.s3_client { - Collecter::new(s3_client, raw_events, config) + Collecter::new(s3_client, client, raw_events, config) } else { - Collecter::new(S3Client::with_defaults().await, raw_events, config) + Collecter::new(S3Client::with_defaults().await, client, raw_events, config) } } @@ -95,20 +106,29 @@ impl CollecterBuilder { } /// Build a collector by manually calling receive to obtain the raw events. - pub async fn build_receive(mut self, config: &Config) -> Result { + pub async fn build_receive<'a>( + mut self, + config: &'a Config, + database_client: &'a database::Client, + ) -> Result> { let url = self.sqs_url.take(); let url = Config::value_or_else(url.as_deref(), config.sqs_url())?; let client = self.sqs_client.take(); if let Some(sqs_client) = &client { Ok(self - .build(Self::receive(sqs_client, url).await?, config) + .build( + Self::receive(sqs_client, url).await?, + config, + database_client, + ) .await) } else { Ok(self .build( Self::receive(&SQSClient::with_defaults().await, url).await?, config, + database_client, ) .await) } @@ -120,7 +140,8 @@ impl CollecterBuilder { /// records is None before any events have been processed. #[derive(Debug)] pub struct Collecter<'a> { - client: Client, + client: S3Client, + database_client: &'a database::Client, raw_events: FlatS3EventMessages, config: &'a Config, n_records: Option, @@ -128,9 +149,15 @@ pub struct Collecter<'a> { impl<'a> Collecter<'a> { /// Create a new collector. - pub(crate) fn new(client: Client, raw_events: FlatS3EventMessages, config: &'a Config) -> Self { + pub(crate) fn new( + client: S3Client, + database_client: &'a database::Client, + raw_events: FlatS3EventMessages, + config: &'a Config, + ) -> Self { Self { client, + database_client, raw_events, config, n_records: None, @@ -138,8 +165,20 @@ impl<'a> Collecter<'a> { } /// Get the inner values. - pub fn into_inner(self) -> (Client, FlatS3EventMessages, &'a Config) { - (self.client, self.raw_events, self.config) + pub fn into_inner( + self, + ) -> ( + S3Client, + &'a database::Client, + FlatS3EventMessages, + &'a Config, + ) { + ( + self.client, + self.database_client, + self.raw_events, + self.config, + ) } /// Converts an AWS datetime to a standard database format. @@ -152,25 +191,146 @@ impl<'a> Collecter<'a> { } /// Gets S3 metadata from HeadObject such as creation/archival timestamps and statuses. - pub async fn head(client: &Client, key: &str, bucket: &str) -> Option { - client - .head_object(key, bucket) - .map_err(|err| { - let err = err.into_service_error(); + pub async fn head(client: &S3Client, event: FlatS3EventMessage) -> Result { + let head = client + .head_object(&event.key, &event.bucket) + .inspect_err(|err| { warn!("Error received from HeadObject: {}", err); - err }) .await + .ok(); + + // Race condition: it's possible that an object gets deleted so quickly that it + // occurs before calling head/tagging. This means that there may be cases where the + // storage class and other fields are not known, or object moves cannot be tracked. + let Some(head) = head else { + return Ok(event); + }; + + trace!(head = ?head, "received HeadObject output"); + + let HeadObjectOutput { + storage_class, + last_modified, + content_length, + e_tag, + checksum_sha256, + delete_marker, + .. + } = head; + + // S3 does not return a storage class for standard, which means this is the + // default. See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax + Ok(event + .update_storage_class(StorageClass::from_aws(storage_class.unwrap_or(Standard))) + .update_last_modified_date(Self::convert_datetime(last_modified)) + .update_size(content_length) + .update_e_tag(e_tag) + .update_sha256(checksum_sha256) + .update_delete_marker(delete_marker)) + } + + /// Gets S3 tags from objects. + pub async fn tagging( + config: &Config, + client: &S3Client, + database_client: &database::Client, + event: FlatS3EventMessage, + ) -> Result { + let tagging = client + .get_object_tagging(&event.key, &event.bucket) + .inspect_err(|err| { + warn!("Error received from GetObjectTagging: {}", err); + }) + .await + .ok(); + + let Some(tagging) = tagging else { + return Ok(event); + }; + + trace!(tagging = ?tagging, "received tagging output"); + + let GetObjectTaggingOutput { mut tag_set, .. } = tagging; + + // Check if the object contains the ingest_id tag. + let tag = tag_set + .clone() + .into_iter() + .find(|tag| tag.key == config.ingester_tag_name()); + + let Some(tag) = tag else { + // If it doesn't, then a new tag needs to be generated. + let ingest_id = UuidGenerator::generate(); + let tag = Tag::builder() + .key(config.ingester_tag_name()) + .value(ingest_id) + .build()?; + tag_set.push(tag); + + // Try to push the tags to S3, only proceed if successful. + let result = client + .put_object_tagging( + &event.key, + &event.bucket, + Tagging::builder().set_tag_set(Some(tag_set)).build()?, + ) + .await + .inspect_err(|err| { + warn!("Error received from PutObjectTagging: {}", err); + }); + + // Only add a ingest_id to the new record if the tagging was successful. + return if result.is_ok() { + Ok(event.with_ingest_id(Some(ingest_id))) + } else { + Ok(event) + }; + }; + + // The object has a ingest_id tag. Grab the existing the tag, returning a new record without + // the ingest_id if the is not valid. + let ingest_id = Uuid::from_str(tag.value()).inspect_err(|err| { + warn!("Failed to parse ingest_id from tag: {}", err); + }); + let Ok(ingest_id) = ingest_id else { + return Ok(event); + }; + + // From here, the new record must be a valid, moved object. + let event = event.with_ingest_id(Some(ingest_id)); + + // Get the attributes from the old record to update the new record with. + let filter = S3ObjectsFilter { + ingest_id: Some(ingest_id), + ..Default::default() + }; + let moved_object = ListQueryBuilder::new(database_client.connection_ref()) + .filter_all(filter, true)? + .one() + .await .ok() + .flatten(); + + // Update the new record with the attributes if possible, or return the new record without + // the attributes if not possible. + if let Some(moved_object) = moved_object { + Ok(event.with_attributes(moved_object.attributes)) + } else { + warn!("Object with ingest_id {} not found in database", ingest_id); + Ok(event) + } } /// Process events and add header and datetime fields. pub async fn update_events( - client: &Client, + config: &Config, + client: &S3Client, + database_client: &database::Client, events: FlatS3EventMessages, ) -> Result { Ok(FlatS3EventMessages( - join_all(events.into_inner().into_iter().map(|mut event| async move { + join_all(events.into_inner().into_iter().map(|event| async move { // No need to run this unnecessarily on removed events. match event.event_type { EventType::Deleted | EventType::Other => return Ok(event), @@ -179,34 +339,8 @@ impl<'a> Collecter<'a> { trace!(key = ?event.key, bucket = ?event.bucket, "updating event"); - // Race condition: it's possible that an object gets deleted so quickly that it - // occurs before calling head. This means that there may be cases where the storage - // class and other fields are not known. - if let Some(head) = Self::head(client, &event.key, &event.bucket).await { - trace!(head = ?head, "received head object output"); - - let HeadObjectOutput { - storage_class, - last_modified, - content_length, - e_tag, - checksum_sha256, - .. - } = head; - - // S3 does not return a storage class for standard, which means this is the - // default. See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax - event = event - .update_storage_class(StorageClass::from_aws( - storage_class.unwrap_or(Standard), - )) - .update_last_modified_date(Self::convert_datetime(last_modified)) - .update_size(content_length) - .update_e_tag(e_tag) - .update_sha256(checksum_sha256); - } - - Ok(event) + let event = Self::head(client, event).await?; + Self::tagging(config, client, database_client, event).await })) .await .into_iter() @@ -220,17 +354,23 @@ impl<'a> Collecter<'a> { } } +impl From for Error { + fn from(err: BuildError) -> Self { + S3Error(err.to_string()) + } +} + #[async_trait] impl<'a> Collect for Collecter<'a> { async fn collect(mut self) -> Result { - let (client, events, config) = self.into_inner(); + let (client, database_client, events, config) = self.into_inner(); - let n_records = events.0.len(); let events = events.sort_and_dedup(); - let events = Self::update_events(&client, events).await?; + let events = Self::update_events(config, &client, database_client, events).await?; // Get only the known event types. let events = events.filter_known(); + let n_records = events.0.len(); if config.paired_ingest_mode() { Ok(EventSource::new( @@ -250,8 +390,18 @@ impl<'a> Collect for Collecter<'a> { pub(crate) mod tests { use std::result; + use crate::database::aws::migration::tests::MIGRATOR; + use crate::events::aws::tests::{ + expected_event_record_simple, expected_flat_events_simple, EXPECTED_SHA256, + }; + use crate::events::aws::StorageClass::IntelligentTiering; + use aws_sdk_s3::error::SdkError; + use aws_sdk_s3::operation::get_object_tagging::GetObjectTaggingError; use aws_sdk_s3::operation::head_object::HeadObjectError; + use aws_sdk_s3::operation::put_object_tagging::{ + PutObjectTaggingError, PutObjectTaggingOutput, + }; use aws_sdk_s3::primitives::{DateTimeFormat, SdkBody}; use aws_sdk_s3::types; use aws_sdk_s3::types::error::NotFound; @@ -259,14 +409,16 @@ pub(crate) mod tests { use aws_sdk_sqs::types::builders::MessageBuilder; use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use aws_smithy_runtime_api::client::result::ServiceError; - use mockall::predicate::eq; - - use crate::events::aws::tests::{ - expected_event_record_simple, expected_flat_events_simple, EXPECTED_SHA256, - }; - use crate::events::aws::StorageClass::IntelligentTiering; + use mockall::predicate::{eq, function}; + use sea_orm::prelude::Json; + use serde_json::json; + use sqlx::{PgPool, Row}; use super::*; + use crate::database::{Client, Ingest}; + use crate::events::aws::message::EventType::Created; + use crate::handlers::aws::tests::s3_object_results; + use crate::queries::EntriesBuilder; #[tokio::test] async fn receive() { @@ -289,19 +441,19 @@ pub(crate) mod tests { assert_eq!(events, expected); } - #[tokio::test] - async fn build_receive() { + #[sqlx::test(migrator = "MIGRATOR")] + async fn build_receive(pool: PgPool) { let mut sqs_client = SQSClient::default(); let mut s3_client = S3Client::default(); set_sqs_client_expectations(&mut sqs_client); - set_s3_client_expectations(&mut s3_client, vec![|| Ok(expected_head_object())]); + set_s3_client_expectations(&mut s3_client); let events = CollecterBuilder::default() .with_sqs_client(sqs_client) .with_s3_client(s3_client) .with_sqs_url("url") - .build_receive(&Default::default()) + .build_receive(&Default::default(), &database::Client::from_pool(pool)) .await .unwrap() .collect() @@ -323,41 +475,63 @@ pub(crate) mod tests { assert_eq!(result, Some(DateTime::::default())); } - #[tokio::test] - async fn head() { + #[sqlx::test(migrator = "MIGRATOR")] + async fn head(pool: PgPool) { let config = Default::default(); - let mut collecter = test_collecter(&config).await; + let client = Client::from_pool(pool); + let mut collecter = test_collecter(&config, &client).await; - set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); - let result = Collecter::head(&collecter.client, "key", "bucket").await; - assert_eq!(result, Some(expected_head_object())); + let result = Collecter::head( + &collecter.client, + FlatS3EventMessage::new_with_generated_id() + .with_key("key".to_string()) + .with_bucket("bucket".to_string()), + ) + .await + .unwrap(); + let expected = result + .clone() + .with_sha256(Some(EXPECTED_SHA256.to_string())) + .with_storage_class(Some(IntelligentTiering)) + .with_last_modified_date(Some(Default::default())); + + assert_eq!(result, expected); } - #[tokio::test] - async fn head_not_found() { + #[sqlx::test(migrator = "MIGRATOR")] + async fn head_not_found(pool: PgPool) { let config = Default::default(); - let mut collecter = test_collecter(&config).await; + let client = Client::from_pool(pool); + let mut collecter = test_collecter(&config, &client).await; - set_s3_client_expectations( + set_s3_head_expectations( &mut collecter.client, vec![|| Err(expected_head_object_not_found())], ); - let result = Collecter::head(&collecter.client, "key", "bucket").await; - assert!(result.is_none()); + let result = Collecter::head( + &collecter.client, + FlatS3EventMessage::new_with_generated_id() + .with_key("key".to_string()) + .with_bucket("bucket".to_string()), + ) + .await; + assert!(result.is_ok()); } - #[tokio::test] - async fn update_events() { + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_events(pool: PgPool) { let config = Default::default(); - let mut collecter = test_collecter(&config).await; + let client = Client::from_pool(pool); + let mut collecter = test_collecter(&config, &client).await; let events = expected_flat_events_simple().sort_and_dedup(); - set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_client_expectations(&mut collecter.client); - let mut result = Collecter::update_events(&collecter.client, events) + let mut result = Collecter::update_events(&config, &collecter.client, &client, events) .await .unwrap() .into_inner() @@ -372,12 +546,151 @@ pub(crate) mod tests { assert_eq!(second.last_modified_date, None); } - #[tokio::test] - async fn collect() { + #[sqlx::test(migrator = "MIGRATOR")] + async fn tagging_without_move(pool: PgPool) { let config = Default::default(); - let mut collecter = test_collecter(&config).await; + let client = Client::from_pool(pool.clone()); + let mut collecter = test_collecter(&config, &client).await; + + collecter.raw_events = + FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() + .with_event_type(Created) + .with_key("key".to_string()) + .with_bucket("bucket".to_string())]); + + set_s3_client_expectations(&mut collecter.client); + + let mut result = collecter.collect().await.unwrap(); + let EventSourceType::S3(events) = &mut result.event_type else { + panic!(); + }; + assert!(events.ingest_ids[0].is_some()); + + client.ingest(result.event_type).await.unwrap(); + + let s3_object_results = s3_object_results(&pool).await; + assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_some()); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn tagging_with_move(pool: PgPool) { + let config = Default::default(); + let client = Client::from_pool(pool.clone()); + let mut collecter = test_collecter(&config, &client).await; + + let ingest_id = UuidGenerator::generate(); + EntriesBuilder::default() + .with_ingest_id(ingest_id) + .with_n(1) + .build(&client) + .await; + + collecter.raw_events = + FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() + .with_event_type(Created) + .with_key("key".to_string()) + .with_bucket("bucket".to_string())]); + + set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_get_tagging_expectations( + &mut collecter.client, + vec![move || { + Ok(GetObjectTaggingOutput::builder() + .set_tag_set(Some(vec![Tag::builder() + .key("ingest_id") + .value(ingest_id.to_string()) + .build() + .unwrap()])) + .build() + .unwrap()) + }], + ); + + let mut result = collecter.collect().await.unwrap(); + let EventSourceType::S3(events) = &mut result.event_type else { + panic!(); + }; + assert!(events.ingest_ids[0].is_some()); + + client.ingest(result.event_type).await.unwrap(); + + let s3_object_results = s3_object_results(&pool).await; + assert_eq!(s3_object_results.len(), 2); + assert_eq!( + s3_object_results[0].get::, _>("ingest_id"), + Some(ingest_id) + ); + assert_eq!( + s3_object_results[1].get::, _>("ingest_id"), + Some(ingest_id) + ); + + let expected_attributes = json!({ + "attributeId": "0", + "nestedId": { + "attributeId": "0" + } + }); + assert_eq!( + s3_object_results[0].get::, _>("attributes"), + Some(expected_attributes.clone()) + ); + assert_eq!( + s3_object_results[1].get::, _>("attributes"), + Some(expected_attributes) + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn tagging_on_fail(pool: PgPool) { + let config = Default::default(); + let client = Client::from_pool(pool.clone()); + let mut collecter = test_collecter(&config, &client).await; + + collecter.raw_events = + FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() + .with_event_type(Created) + .with_key("key".to_string()) + .with_bucket("bucket".to_string())]); + + set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_get_tagging_expectations( + &mut collecter.client, + vec![move || { + Err(SdkError::ServiceError( + ServiceError::builder() + .source(GetObjectTaggingError::unhandled("unhandled")) + .raw(HttpResponse::new(404.try_into().unwrap(), SdkBody::empty())) + .build(), + )) + }], + ); + + let mut result = collecter.collect().await.unwrap(); + let EventSourceType::S3(events) = &mut result.event_type else { + panic!(); + }; + assert!(events.ingest_ids[0].is_none()); - set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + client.ingest(result.event_type).await.unwrap(); + + let s3_object_results = s3_object_results(&pool).await; + assert_eq!(s3_object_results.len(), 1); + assert!(s3_object_results[0] + .get::, _>("ingest_id") + .is_none()); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn collect(pool: PgPool) { + let config = Default::default(); + let client = Client::from_pool(pool); + let mut collecter = test_collecter(&config, &client).await; + + set_s3_client_expectations(&mut collecter.client); let result = collecter.collect().await.unwrap().into_inner().0; @@ -397,7 +710,7 @@ pub(crate) mod tests { } } - pub(crate) fn set_s3_client_expectations(client: &mut Client, expectations: Vec) + pub(crate) fn set_s3_head_expectations(client: &mut S3Client, expectations: Vec) where F: Fn() -> result::Result> + Send + 'static, { @@ -411,6 +724,52 @@ pub(crate) mod tests { } } + pub(crate) fn set_s3_get_tagging_expectations( + client: &mut S3Client, + get_tagging_expectations: Vec, + ) where + F: Fn() -> result::Result> + + Send + + 'static, + { + let get_tagging = client + .expect_get_object_tagging() + .with(eq("key"), eq("bucket")) + .times(get_tagging_expectations.len()); + + for expectation in get_tagging_expectations { + get_tagging.returning(move |_, _| expectation()); + } + } + + pub(crate) fn set_s3_tagging_expectations( + client: &mut S3Client, + get_tagging_expectations: Vec, + put_tagging_expectations: Vec, + ) where + F: Fn() -> result::Result> + + Send + + 'static, + T: Fn() -> result::Result> + + Send + + 'static, + { + set_s3_get_tagging_expectations(client, get_tagging_expectations); + + let put_tagging = client + .expect_put_object_tagging() + .with( + eq("key"), + eq("bucket"), + function(|t: &Tagging| t.tag_set().first().unwrap().key == "ingest_id"), + ) + .times(put_tagging_expectations.len()); + + for expectation in put_tagging_expectations { + put_tagging.returning(move |_, _, _| expectation()); + } + } + pub(crate) fn set_sqs_client_expectations(sqs_client: &mut SQSClient) { sqs_client .expect_receive_message() @@ -430,6 +789,26 @@ pub(crate) mod tests { .build() } + pub(crate) fn expected_get_object_tagging() -> GetObjectTaggingOutput { + GetObjectTaggingOutput::builder() + .set_tag_set(Some(vec![])) + .build() + .unwrap() + } + + pub(crate) fn expected_put_object_tagging() -> PutObjectTaggingOutput { + PutObjectTaggingOutput::builder().build() + } + + pub(crate) fn set_s3_client_expectations(s3_client: &mut S3Client) { + set_s3_head_expectations(s3_client, vec![|| Ok(expected_head_object())]); + set_s3_tagging_expectations( + s3_client, + vec![|| Ok(expected_get_object_tagging())], + vec![|| Ok(expected_put_object_tagging())], + ); + } + pub(crate) fn expected_head_object_not_found() -> SdkError { SdkError::ServiceError( ServiceError::builder() @@ -439,8 +818,13 @@ pub(crate) mod tests { ) } - async fn test_collecter(config: &Config) -> Collecter<'_> { - Collecter::new(Client::default(), expected_flat_events_simple(), config) + async fn test_collecter<'a>(config: &'a Config, database_client: &'a Client) -> Collecter<'a> { + Collecter::new( + S3Client::default(), + database_client, + expected_flat_events_simple(), + config, + ) } fn expected_receive_message() -> ReceiveMessageOutput { diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs index cab5c468e..8d2aee699 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs @@ -26,7 +26,7 @@ use std::result; #[double] use crate::clients::aws::s3::Client; -use crate::error::Error::S3InventoryError; +use crate::error::Error::S3Error; use crate::error::{Error, Result}; use crate::events::aws::message::{default_version_id, quote_e_tag, EventType::Created}; use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass}; @@ -69,7 +69,7 @@ impl Inventory { let mut inventory_bytes = vec![]; MultiGzDecoder::new(BufReader::new(body)) .read_to_end(&mut inventory_bytes) - .map_err(|err| S3InventoryError(format!("decompressing CSV: {}", err)))?; + .map_err(|err| S3Error(format!("decompressing CSV: {}", err)))?; // AWS seems to return extra newlines at the end of the CSV, so we remove these let inventory_bytes = Self::trim_whitespace(inventory_bytes.as_slice()); @@ -132,9 +132,8 @@ impl Inventory { // https://github.com/Swoorup/arrow-convert // This is should be similar to arrow2_convert::TryIntoArrow in the above performance graph, // as it is a port of arrow2_convert with arrow-rs as the dependency. - from_slice::>(buf.as_slice()).map_err(|err| { - S3InventoryError(format!("failed to deserialize json from arrow: {}", err)) - }) + from_slice::>(buf.as_slice()) + .map_err(|err| S3Error(format!("failed to deserialize json from arrow: {}", err))) } /// Parse a parquet manifest file into records. @@ -162,11 +161,11 @@ impl Inventory { .client .get_object(key.as_ref(), bucket.as_ref()) .await - .map_err(|err| S3InventoryError(err.to_string()))? + .map_err(|err| S3Error(err.to_string()))? .body .collect() .await - .map_err(|err| S3InventoryError(err.to_string()))? + .map_err(|err| S3Error(err.to_string()))? .to_vec()) } @@ -174,10 +173,10 @@ impl Inventory { fn verify_md5>(data: T, verify_with: T) -> Result<()> { if md5::compute(data).0 != hex::decode(&verify_with) - .map_err(|err| S3InventoryError(format!("decoding hex string: {}", err)))? + .map_err(|err| S3Error(format!("decoding hex string: {}", err)))? .as_slice() { - return Err(S3InventoryError( + return Err(S3Error( "mismatched MD5 checksums in inventory manifest".to_string(), )); } @@ -232,9 +231,7 @@ impl Inventory { InventoryFormat::Csv => self.parse_csv(schema, body.as_slice()).await, InventoryFormat::Parquet => self.parse_parquet(body).await, InventoryFormat::Orc => self.parse_orc(body).await, - _ => Err(S3InventoryError( - "unsupported manifest file type".to_string(), - )), + _ => Err(S3Error("unsupported manifest file type".to_string())), } } @@ -246,9 +243,7 @@ impl Inventory { // Proper arn, parse out the bucket. Ok(arn) => { if arn.service != Service::S3.into() { - return Err(S3InventoryError( - "destination bucket ARN is not S3".to_string(), - )); + return Err(S3Error("destination bucket ARN is not S3".to_string())); } arn.resource.to_string() } @@ -286,25 +281,25 @@ impl Inventory { impl From for Error { fn from(error: csv::Error) -> Self { - S3InventoryError(error.to_string()) + S3Error(error.to_string()) } } impl From for Error { fn from(error: ParquetError) -> Self { - S3InventoryError(error.to_string()) + S3Error(error.to_string()) } } impl From for Error { fn from(error: ArrowError) -> Self { - S3InventoryError(error.to_string()) + S3Error(error.to_string()) } } impl From for Error { fn from(error: OrcError) -> Self { - S3InventoryError(error.to_string()) + S3Error(error.to_string()) } } @@ -533,6 +528,8 @@ impl From for FlatS3EventMessage { // Anything in an inventory report is always a created event. event_type: Created, is_delete_marker: is_delete_marker.unwrap_or_default(), + ingest_id: None, + attributes: None, number_duplicate_events: 0, number_reordered: 0, } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs index 3fe939148..771dd1bc0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs @@ -3,14 +3,13 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages}; use crate::uuid::UuidGenerator; /// The type of S3 event. #[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Clone, Hash, sqlx::Type)] -#[sqlx(type_name = "event_type", no_pg_array)] +#[sqlx(type_name = "event_type")] pub enum EventType { #[default] Created, @@ -18,12 +17,6 @@ pub enum EventType { Other, } -impl PgHasArrayType for EventType { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_event_type") - } -} - /// Data for converting from S3 events to the internal filemanager event type. #[derive(Debug)] pub struct EventTypeData { @@ -198,6 +191,8 @@ impl From for FlatS3EventMessage { sha256: None, event_type, is_delete_marker, + ingest_id: None, + attributes: None, number_duplicate_events: 0, number_reordered: 0, } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs index 82979617f..4a6595cad 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs @@ -5,7 +5,6 @@ use aws_sdk_s3::types::StorageClass as AwsStorageClass; use chrono::{DateTime, Utc}; use itertools::{izip, Itertools}; use serde::{Deserialize, Serialize}; -use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use sqlx::FromRow; use uuid::Uuid; @@ -14,6 +13,7 @@ use message::EventMessage; use crate::events::aws::message::{default_version_id, EventType}; use crate::events::aws::EventType::{Created, Deleted, Other}; use crate::uuid::UuidGenerator; +use sea_orm::prelude::Json; pub mod collecter; pub mod inventory; @@ -22,7 +22,7 @@ pub mod message; /// A wrapper around AWS storage types with sqlx support. #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Clone, sqlx::Type, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] -#[sqlx(type_name = "storage_class", no_pg_array)] +#[sqlx(type_name = "storage_class")] pub enum StorageClass { DeepArchive, Glacier, @@ -36,12 +36,6 @@ pub enum StorageClass { StandardIa, } -impl PgHasArrayType for StorageClass { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_storage_class") - } -} - impl StorageClass { /// Convert from the AWS storage class type to the filemanager storage class. pub fn from_aws(storage_class: AwsStorageClass) -> Option { @@ -79,6 +73,8 @@ pub struct TransposedS3EventMessages { pub last_modified_dates: Vec>>, pub event_types: Vec, pub is_delete_markers: Vec, + pub ingest_ids: Vec>, + pub attributes: Vec>, } impl TransposedS3EventMessages { @@ -99,6 +95,8 @@ impl TransposedS3EventMessages { last_modified_dates: Vec::with_capacity(capacity), event_types: Vec::with_capacity(capacity), is_delete_markers: Vec::with_capacity(capacity), + ingest_ids: Vec::with_capacity(capacity), + attributes: Vec::with_capacity(capacity), } } @@ -118,6 +116,8 @@ impl TransposedS3EventMessages { last_modified_date, event_type, is_delete_marker, + ingest_id, + attributes, .. } = message; @@ -134,6 +134,8 @@ impl TransposedS3EventMessages { self.last_modified_dates.push(last_modified_date); self.event_types.push(event_type); self.is_delete_markers.push(is_delete_marker); + self.ingest_ids.push(ingest_id); + self.attributes.push(attributes); } } @@ -171,6 +173,8 @@ impl From for FlatS3EventMessages { messages.last_modified_dates, messages.event_types, messages.is_delete_markers, + messages.ingest_ids, + messages.attributes, ) .map( |( @@ -187,6 +191,8 @@ impl From for FlatS3EventMessages { last_modified_date, event_type, is_delete_marker, + ingest_id, + attributes, )| { FlatS3EventMessage { s3_object_id, @@ -202,6 +208,8 @@ impl From for FlatS3EventMessages { event_time, event_type, is_delete_marker, + ingest_id, + attributes, number_duplicate_events: 0, number_reordered: 0, } @@ -344,7 +352,6 @@ impl FlatS3EventMessages { pub fn sort(self) -> Self { let mut messages = self.into_inner(); - messages.sort(); messages.sort_by(|a, b| { if let (Some(a_sequencer), Some(b_sequencer)) = (a.sequencer.as_ref(), b.sequencer.as_ref()) @@ -420,7 +427,7 @@ impl FlatS3EventMessages { } /// A flattened AWS S3 record -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Default, FromRow)] +#[derive(Debug, Eq, PartialEq, Clone, Default, FromRow)] pub struct FlatS3EventMessage { pub s3_object_id: Uuid, pub sequencer: Option, @@ -435,6 +442,8 @@ pub struct FlatS3EventMessage { pub event_time: Option>, pub event_type: EventType, pub is_delete_marker: bool, + pub ingest_id: Option, + pub attributes: Option, pub number_duplicate_events: i64, pub number_reordered: i64, } @@ -487,6 +496,14 @@ impl FlatS3EventMessage { self } + /// Update the delete marker if not None. + pub fn update_delete_marker(mut self, delete_marker: Option) -> Self { + delete_marker + .into_iter() + .for_each(|is_delete_marker| self.is_delete_marker = is_delete_marker); + self + } + /// Set the s3 object id. pub fn with_s3_object_id(mut self, s3_object_id: Uuid) -> Self { self.s3_object_id = s3_object_id; @@ -559,6 +576,18 @@ impl FlatS3EventMessage { self } + /// Set the move id. + pub fn with_ingest_id(mut self, ingest_id: Option) -> Self { + self.ingest_id = ingest_id; + self + } + + /// Set the attributes. + pub fn with_attributes(mut self, attributes: Option) -> Self { + self.attributes = attributes; + self + } + /// Set the event type. pub fn with_event_type(mut self, event_type: EventType) -> Self { self.event_type = event_type; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/mod.rs index 0f65f9942..0922deb75 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/mod.rs @@ -16,7 +16,7 @@ pub trait Collect { } /// The event source with a type and the number of (potentially duplicate) records contained. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EventSource { event_type: EventSourceType, n_records: usize, @@ -39,7 +39,7 @@ impl EventSource { /// The type of event. #[allow(clippy::large_enum_variant)] -#[derive(Debug)] +#[derive(Debug, Clone)] #[non_exhaustive] pub enum EventSourceType { S3(TransposedS3EventMessages), diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs index 9ffe59068..1b4277ea1 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs @@ -39,7 +39,7 @@ pub async fn receive_and_ingest<'a>( .with_s3_client(s3_client) .with_sqs_client(sqs_client) .set_sqs_url(sqs_url) - .build_receive(env_config) + .build_receive(env_config, database_client) .await? .collect() .await? @@ -74,7 +74,7 @@ pub async fn ingest_event( let events = CollecterBuilder::default() .with_s3_client(s3_client) - .build(events, env_config) + .build(events, env_config, &database_client) .await .collect() .await? @@ -211,13 +211,14 @@ pub(crate) mod tests { use sqlx::postgres::PgRow; use sqlx::PgPool; + use super::*; use crate::database::aws::ingester::tests::{ assert_row, expected_message, fetch_results, remove_version_ids, replace_sequencers, test_events, test_ingester, }; use crate::database::aws::migration::tests::MIGRATOR; use crate::events::aws::collecter::tests::{ - expected_head_object, set_s3_client_expectations, set_sqs_client_expectations, + set_s3_client_expectations, set_sqs_client_expectations, }; use crate::events::aws::inventory::tests::{ csv_manifest_from_key_expectations, EXPECTED_LAST_MODIFIED_ONE, @@ -234,8 +235,6 @@ pub(crate) mod tests { use crate::events::aws::FlatS3EventMessage; use crate::events::EventSourceType::S3; - use super::*; - #[sqlx::test(migrator = "MIGRATOR")] async fn test_receive_and_ingest(pool: PgPool) { let client = Client::from_pool(pool); @@ -252,7 +251,7 @@ pub(crate) mod tests { async fn test_ingest_event(pool: PgPool) { let mut s3_client = S3Client::default(); - set_s3_client_expectations(&mut s3_client, vec![|| Ok(expected_head_object())]); + set_s3_client_expectations(&mut s3_client); let event = SqsEvent { records: vec![SqsMessage { @@ -376,7 +375,7 @@ pub(crate) mod tests { let mut s3_client = S3Client::default(); set_sqs_client_expectations(&mut sqs_client); - set_s3_client_expectations(&mut s3_client, vec![|| Ok(expected_head_object())]); + set_s3_client_expectations(&mut s3_client); f(sqs_client, s3_client).await; @@ -528,7 +527,7 @@ pub(crate) mod tests { assert_row(row, message, Some("".to_string()), None); } - async fn s3_object_results(pool: &PgPool) -> Vec { + pub(crate) async fn s3_object_results(pool: &PgPool) -> Vec { sqlx::query("select * from s3_object order by sequencer, key") .fetch_all(pool) .await diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index b7cee82a8..4e05f03a7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -149,7 +149,8 @@ where filter .is_delete_marker .map(|v| s3_object::Column::IsDeleteMarker.eq(v)), - ); + ) + .add_option(filter.ingest_id.map(|v| s3_object::Column::IngestId.eq(v))); if let Some(attributes) = filter.attributes { let json_conditions = Self::json_conditions( diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index d0834633c..a5749a739 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -3,6 +3,11 @@ use std::ops::Add; +use crate::database::entities::s3_object::ActiveModel as ActiveS3Object; +use crate::database::entities::s3_object::Model as S3Object; +use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; +use crate::database::Client; +use crate::uuid::UuidGenerator; use chrono::{DateTime, Days}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -10,12 +15,7 @@ use sea_orm::Set; use sea_orm::{ActiveModelTrait, TryIntoModel}; use serde_json::json; use strum::EnumCount; - -use crate::database::entities::s3_object::ActiveModel as ActiveS3Object; -use crate::database::entities::s3_object::Model as S3Object; -use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; -use crate::database::Client; -use crate::uuid::UuidGenerator; +use uuid::Uuid; pub mod get; pub mod list; @@ -41,11 +41,12 @@ impl Entries { shuffle: bool, bucket_divisor: usize, key_divisor: usize, + ingest_id: Option, ) -> Vec { let mut output = vec![]; let mut entries: Vec<_> = (0..n) - .map(|index| Self::generate_entry(index, bucket_divisor, key_divisor)) + .map(|index| Self::generate_entry(index, bucket_divisor, key_divisor, ingest_id)) .collect(); if shuffle { @@ -70,6 +71,7 @@ impl Entries { index: usize, bucket_divisor: usize, key_divisor: usize, + ingest_id: Option, ) -> ActiveS3Object { let event = Self::event_type(index); let date = || Set(Some(DateTime::default().add(Days::new(index as u64)))); @@ -82,6 +84,7 @@ impl Entries { ActiveS3Object { s3_object_id: Set(UuidGenerator::generate()), + ingest_id: Set(Some(ingest_id.unwrap_or_else(UuidGenerator::generate))), event_type: Set(event.clone()), bucket: Set((index / bucket_divisor).to_string()), key: Set((index / key_divisor).to_string()), @@ -126,6 +129,7 @@ pub struct EntriesBuilder { bucket_divisor: usize, key_divisor: usize, shuffle: bool, + ingest_id: Option, } impl EntriesBuilder { @@ -153,6 +157,12 @@ impl EntriesBuilder { self } + /// Set whether to shuffle. + pub fn with_ingest_id(mut self, ingest_id: Uuid) -> Self { + self.ingest_id = Some(ingest_id); + self + } + /// Build the entries and initialize the database. pub async fn build(self, client: &Client) -> Entries { let mut entries = Entries::initialize_database( @@ -161,6 +171,7 @@ impl EntriesBuilder { self.shuffle, self.bucket_divisor, self.key_divisor, + self.ingest_id, ) .await; @@ -178,6 +189,7 @@ impl Default for EntriesBuilder { bucket_divisor: 2, key_divisor: 1, shuffle: false, + ingest_id: None, } } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs index 18d536647..af96c854b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs @@ -1,13 +1,13 @@ //! Routing logic for query filtering. //! +use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; +use crate::routes::filter::wildcard::{Wildcard, WildcardEither}; use sea_orm::prelude::{DateTimeWithTimeZone, Json}; use serde::{Deserialize, Serialize}; use serde_json::Map; use utoipa::IntoParams; - -use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; -use crate::routes::filter::wildcard::{Wildcard, WildcardEither}; +use uuid::Uuid; pub mod wildcard; @@ -68,6 +68,9 @@ pub struct S3ObjectsFilter { /// Query by the object delete marker. pub(crate) is_delete_marker: Option, #[param(required = false)] + /// Query by the ingest id that objects get tagged with. + pub(crate) ingest_id: Option, + #[param(required = false)] /// Query by JSON attributes. Supports nested syntax to access inner /// fields, e.g. `attributes[attribute_id]=...`. This only deserializes /// into string fields, and does not support other JSON types. E.g.