diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts index 58ee7f088..60623ce82 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/api.ts @@ -25,6 +25,6 @@ export class ApiFunction extends fn.Function { ...props, }); - this.addPoliciesForBuckets(props.buckets); + this.addPoliciesForBuckets(props.buckets, fn.Function.getObjectActions()); } } diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts index a186dd8b5..ea74f79e5 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/function.ts @@ -165,17 +165,36 @@ export class Function extends Construct { /** * Add policies for 's3:List*' and 's3:Get*' on the buckets to this function's role. */ - addPoliciesForBuckets(buckets: string[], additionalActions?: string[]) { + addPoliciesForBuckets(buckets: string[], actions: string[]) { buckets.map((bucket) => { this.addToPolicy( new PolicyStatement({ - actions: [...['s3:ListBucket', 's3:GetObject'], ...(additionalActions ?? [])], + actions, resources: [`arn:aws:s3:::${bucket}`, `arn:aws:s3:::${bucket}/*`], }) ); }); } + /** + * Get policy actions for fetching objects. + */ + static getObjectActions(): string[] { + return ['s3:ListBucket', 's3:GetObject', 's3:GetObjectVersion']; + } + + /** + * Get policy actions for using object tags. + */ + static objectTaggingActions(): string[] { + return [ + 's3:GetObjectTagging', + 's3:GetObjectVersionTagging', + 's3:PutObjectTagging', + 's3:PutObjectVersionTagging', + ]; + } + /** * Get the function name. */ diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts index 14c1f6f9b..0f69b89e2 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/ingest.ts @@ -52,10 +52,8 @@ export class IngestFunction extends fn.Function { this.function.addEventSource(eventSource); }); this.addPoliciesForBuckets(props.buckets, [ - 's3:GetObjectTagging', - 's3:GetObjectVersionTagging', - 's3:PutObjectTagging', - 's3:PutObjectVersionTagging', + ...fn.Function.getObjectActions(), + ...fn.Function.objectTaggingActions(), ]); } } diff --git a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/inventory.ts b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/inventory.ts index 8555800da..7e62cca30 100644 --- a/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/inventory.ts +++ b/lib/workload/stateless/stacks/filemanager/deploy/constructs/functions/inventory.ts @@ -34,6 +34,9 @@ export class InventoryFunction extends fn.Function { functionName: INVENTORY_FUNCTION_NAME, }); - this.addPoliciesForBuckets(props.buckets); + this.addPoliciesForBuckets(props.buckets, [ + ...fn.Function.getObjectActions(), + ...fn.Function.objectTaggingActions(), + ]); } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs index 6e1ff8f05..2586fe665 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/clients/aws/s3.rs @@ -26,6 +26,44 @@ pub struct Client { inner: s3::Client, } +/// Override settings related to response headers. +#[derive(Debug)] +pub struct ResponseHeaders { + content_disposition: String, + content_type: Option, + content_encoding: Option, +} + +impl ResponseHeaders { + /// Create a new `ResponseHeaders` config. + pub fn new( + content_disposition: String, + content_type: Option, + content_encoding: Option, + ) -> Self { + Self { + content_disposition, + content_type, + content_encoding, + } + } + + /// Get the content disposition. + pub fn content_disposition(&self) -> &str { + &self.content_disposition + } + + /// Get the content type. + pub fn content_type(&self) -> Option<&str> { + self.content_type.as_deref() + } + + /// Get the content encoding. + pub fn content_encoding(&self) -> Option<&str> { + self.content_encoding.as_deref() + } +} + #[automock] impl Client { /// Create a new S3 client. @@ -48,12 +86,14 @@ impl Client { &self, key: &str, bucket: &str, + version_id: &str, ) -> Result { self.inner .head_object() .checksum_mode(Enabled) .key(key) .bucket(bucket) + .version_id(version_id) .send() .await } @@ -63,12 +103,14 @@ impl Client { &self, key: &str, bucket: &str, + version_id: &str, ) -> Result { self.inner .get_object() .checksum_mode(Enabled) .key(key) .bucket(bucket) + .version_id(version_id) .send() .await } @@ -78,11 +120,13 @@ impl Client { &self, key: &str, bucket: &str, + version_id: &str, ) -> Result { self.inner .get_object_tagging() .key(key) .bucket(bucket) + .version_id(version_id) .send() .await } @@ -92,12 +136,14 @@ impl Client { &self, key: &str, bucket: &str, + version_id: &str, tagging: Tagging, ) -> Result { self.inner .put_object_tagging() .key(key) .bucket(bucket) + .version_id(version_id) .tagging(tagging) .send() .await @@ -108,18 +154,18 @@ impl Client { &self, key: &str, bucket: &str, - response_content_disposition: &str, - response_content_type: Option, - response_content_encoding: Option, + version_id: &str, + response_headers: ResponseHeaders, expires_in: Duration, ) -> Result { self.inner .get_object() - .response_content_disposition(response_content_disposition) - .set_response_content_type(response_content_type) - .set_response_content_encoding(response_content_encoding) + .response_content_disposition(response_headers.content_disposition) + .set_response_content_type(response_headers.content_type) + .set_response_content_encoding(response_headers.content_encoding) .key(key) .bucket(bucket) + .version_id(version_id) .presigned( PresigningConfig::expires_in( expires_in diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index e1abc169f..09a734824 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -193,7 +193,7 @@ impl<'a> Collecter<'a> { /// Gets S3 metadata from HeadObject such as creation/archival timestamps and statuses. pub async fn head(client: &S3Client, event: FlatS3EventMessage) -> Result { let head = client - .head_object(&event.key, &event.bucket) + .head_object(&event.key, &event.bucket, &event.version_id) .inspect_err(|err| { warn!("Error received from HeadObject: {}", err); }) @@ -238,7 +238,7 @@ impl<'a> Collecter<'a> { event: FlatS3EventMessage, ) -> Result { let tagging = client - .get_object_tagging(&event.key, &event.bucket) + .get_object_tagging(&event.key, &event.bucket, &event.version_id) .inspect_err(|err| { warn!("Error received from GetObjectTagging: {}", err); }) @@ -273,6 +273,7 @@ impl<'a> Collecter<'a> { .put_object_tagging( &event.key, &event.bucket, + &event.version_id, Tagging::builder().set_tag_set(Some(tag_set)).build()?, ) .await @@ -393,6 +394,7 @@ pub(crate) mod tests { use crate::database::aws::migration::tests::MIGRATOR; use crate::events::aws::tests::{ expected_event_record_simple, expected_flat_events_simple, EXPECTED_SHA256, + EXPECTED_VERSION_ID, }; use crate::events::aws::StorageClass::IntelligentTiering; @@ -416,6 +418,7 @@ pub(crate) mod tests { use super::*; use crate::database::{Client, Ingest}; + use crate::events::aws::message::default_version_id; use crate::events::aws::message::EventType::Created; use crate::handlers::aws::tests::s3_object_results; use crate::queries::EntriesBuilder; @@ -453,7 +456,7 @@ pub(crate) mod tests { .with_sqs_client(sqs_client) .with_s3_client(s3_client) .with_sqs_url("url") - .build_receive(&Default::default(), &database::Client::from_pool(pool)) + .build_receive(&Default::default(), &Client::from_pool(pool)) .await .unwrap() .collect() @@ -481,13 +484,15 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut collecter = test_collecter(&config, &client).await; - set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_head_expectations( + &mut collecter.client, + vec![|| Ok(expected_head_object())], + default_version_id(), + ); let result = Collecter::head( &collecter.client, - FlatS3EventMessage::new_with_generated_id() - .with_key("key".to_string()) - .with_bucket("bucket".to_string()), + expected_s3_event_message().with_version_id(default_version_id()), ) .await .unwrap(); @@ -509,13 +514,12 @@ pub(crate) mod tests { set_s3_head_expectations( &mut collecter.client, vec![|| Err(expected_head_object_not_found())], + default_version_id(), ); let result = Collecter::head( &collecter.client, - FlatS3EventMessage::new_with_generated_id() - .with_key("key".to_string()) - .with_bucket("bucket".to_string()), + expected_s3_event_message().with_version_id(default_version_id()), ) .await; assert!(result.is_ok()); @@ -552,11 +556,9 @@ pub(crate) mod tests { let client = Client::from_pool(pool.clone()); let mut collecter = test_collecter(&config, &client).await; - collecter.raw_events = - FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() - .with_event_type(Created) - .with_key("key".to_string()) - .with_bucket("bucket".to_string())]); + collecter.raw_events = FlatS3EventMessages(vec![ + expected_s3_event_message().with_version_id(EXPECTED_VERSION_ID.to_string()) + ]); set_s3_client_expectations(&mut collecter.client); @@ -588,13 +590,15 @@ pub(crate) mod tests { .build(&client) .await; - collecter.raw_events = - FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() - .with_event_type(Created) - .with_key("key".to_string()) - .with_bucket("bucket".to_string())]); + collecter.raw_events = FlatS3EventMessages(vec![ + expected_s3_event_message().with_version_id(default_version_id()) + ]); - set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_head_expectations( + &mut collecter.client, + vec![|| Ok(expected_head_object())], + default_version_id(), + ); set_s3_get_tagging_expectations( &mut collecter.client, vec![move || { @@ -607,6 +611,7 @@ pub(crate) mod tests { .build() .unwrap()) }], + default_version_id(), ); let mut result = collecter.collect().await.unwrap(); @@ -650,13 +655,15 @@ pub(crate) mod tests { let client = Client::from_pool(pool.clone()); let mut collecter = test_collecter(&config, &client).await; - collecter.raw_events = - FlatS3EventMessages(vec![FlatS3EventMessage::new_with_generated_id() - .with_event_type(Created) - .with_key("key".to_string()) - .with_bucket("bucket".to_string())]); + collecter.raw_events = FlatS3EventMessages(vec![ + expected_s3_event_message().with_version_id(default_version_id()) + ]); - set_s3_head_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); + set_s3_head_expectations( + &mut collecter.client, + vec![|| Ok(expected_head_object())], + default_version_id(), + ); set_s3_get_tagging_expectations( &mut collecter.client, vec![move || { @@ -667,6 +674,7 @@ pub(crate) mod tests { .build(), )) }], + default_version_id(), ); let mut result = collecter.collect().await.unwrap(); @@ -697,6 +705,13 @@ pub(crate) mod tests { assert_collected_events(result); } + fn expected_s3_event_message() -> FlatS3EventMessage { + FlatS3EventMessage::new_with_generated_id() + .with_event_type(Created) + .with_key("key".to_string()) + .with_bucket("bucket".to_string()) + } + pub(crate) fn assert_collected_events(result: EventSourceType) { match result { EventSourceType::S3(events) => { @@ -710,23 +725,27 @@ pub(crate) mod tests { } } - pub(crate) fn set_s3_head_expectations(client: &mut S3Client, expectations: Vec) - where + pub(crate) fn set_s3_head_expectations( + client: &mut S3Client, + expectations: Vec, + version_id: String, + ) where F: Fn() -> result::Result> + Send + 'static, { let client = client .expect_head_object() - .with(eq("key"), eq("bucket")) + .with(eq("key"), eq("bucket"), eq(version_id)) .times(expectations.len()); for expectation in expectations { - client.returning(move |_, _| expectation()); + client.returning(move |_, _, _| expectation()); } } pub(crate) fn set_s3_get_tagging_expectations( client: &mut S3Client, get_tagging_expectations: Vec, + version_id: String, ) where F: Fn() -> result::Result> + Send @@ -734,11 +753,11 @@ pub(crate) mod tests { { let get_tagging = client .expect_get_object_tagging() - .with(eq("key"), eq("bucket")) + .with(eq("key"), eq("bucket"), eq(version_id)) .times(get_tagging_expectations.len()); for expectation in get_tagging_expectations { - get_tagging.returning(move |_, _| expectation()); + get_tagging.returning(move |_, _, _| expectation()); } } @@ -746,6 +765,7 @@ pub(crate) mod tests { client: &mut S3Client, get_tagging_expectations: Vec, put_tagging_expectations: Vec, + version_id: String, ) where F: Fn() -> result::Result> + Send @@ -754,19 +774,20 @@ pub(crate) mod tests { + Send + 'static, { - set_s3_get_tagging_expectations(client, get_tagging_expectations); + set_s3_get_tagging_expectations(client, get_tagging_expectations, version_id.to_string()); let put_tagging = client .expect_put_object_tagging() .with( eq("key"), eq("bucket"), + eq(version_id), function(|t: &Tagging| t.tag_set().first().unwrap().key == "ingest_id"), ) .times(put_tagging_expectations.len()); for expectation in put_tagging_expectations { - put_tagging.returning(move |_, _, _| expectation()); + put_tagging.returning(move |_, _, _, _| expectation()); } } @@ -801,11 +822,16 @@ pub(crate) mod tests { } pub(crate) fn set_s3_client_expectations(s3_client: &mut S3Client) { - set_s3_head_expectations(s3_client, vec![|| Ok(expected_head_object())]); + set_s3_head_expectations( + s3_client, + vec![|| Ok(expected_head_object())], + EXPECTED_VERSION_ID.to_string(), + ); set_s3_tagging_expectations( s3_client, vec![|| Ok(expected_get_object_tagging())], vec![|| Ok(expected_put_object_tagging())], + EXPECTED_VERSION_ID.to_string(), ); } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs index 8d2aee699..2370c6c7a 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs @@ -159,7 +159,7 @@ impl Inventory { async fn get_object_bytes>(&self, key: K, bucket: K) -> Result> { Ok(self .client - .get_object(key.as_ref(), bucket.as_ref()) + .get_object(key.as_ref(), bucket.as_ref(), default_version_id().as_ref()) .await .map_err(|err| S3Error(err.to_string()))? .body @@ -668,9 +668,13 @@ pub(crate) mod tests { client .expect_get_object() - .with(eq("manifest.checksum"), eq(MANIFEST_BUCKET)) + .with( + eq("manifest.checksum"), + eq(MANIFEST_BUCKET), + eq(default_version_id()), + ) .once() - .returning(move |_, _| { + .returning(move |_, _, _| { Ok(GetObjectOutput::builder() .body(ByteStream::from(checksum.clone())) .build()) @@ -863,9 +867,13 @@ pub(crate) mod tests { fn set_client_manifest_expectations(s3_client: &mut Client, data: Vec) { s3_client .expect_get_object() - .with(eq("manifest.json"), eq(MANIFEST_BUCKET)) + .with( + eq("manifest.json"), + eq(MANIFEST_BUCKET), + eq(default_version_id()), + ) .once() - .returning(move |_, _| { + .returning(move |_, _, _| { Ok(GetObjectOutput::builder() .body(ByteStream::from(data.clone())) .build()) @@ -924,9 +932,10 @@ pub(crate) mod tests { .with( eq(format!("{}{}", MANIFEST_KEY, ending)), eq(MANIFEST_BUCKET), + eq(default_version_id()), ) .once() - .returning(move |_, _| { + .returning(move |_, _, _| { Ok(GetObjectOutput::builder() .body(ByteStream::from(data.clone())) .build()) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/presign.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/presign.rs index 942e0eccb..5f9ae781a 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/presign.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/presign.rs @@ -7,6 +7,7 @@ use url::Url; use utoipa::{IntoParams, ToSchema}; use crate::clients::aws::s3; +use crate::clients::aws::s3::ResponseHeaders; use crate::database::entities::s3_object; use crate::env::Config; use crate::error::Error::PresignedUrlError; @@ -86,6 +87,7 @@ impl<'a> PresignedUrlBuilder<'a> { &self, key: &str, bucket: &str, + version_id: &str, response_content_disposition: ContentDisposition, response_content_type: Option, response_content_encoding: Option, @@ -111,9 +113,12 @@ impl<'a> PresignedUrlBuilder<'a> { .presign_url( key, bucket, - content_disposition, - response_content_type, - response_content_encoding, + version_id, + ResponseHeaders::new( + content_disposition.to_string(), + response_content_type, + response_content_encoding, + ), self.config .api_presign_expiry() .unwrap_or(DEFAULT_PRESIGN_EXPIRY), @@ -141,6 +146,7 @@ impl<'a> PresignedUrlBuilder<'a> { .presign_url( &model.key, &model.bucket, + &model.version_id, response_content_disposition, response_content_type, response_content_encoding, @@ -160,6 +166,7 @@ pub(crate) mod tests { use crate::clients::aws::s3; use crate::env::Config; + use crate::events::aws::message::default_version_id; use crate::routes::list::tests::mock_get_object; use super::*; @@ -175,7 +182,14 @@ pub(crate) mod tests { let builder = PresignedUrlBuilder::new(&client, &config).set_object_size(None); let url = builder - .presign_url("0", "1", ContentDisposition::Inline, None, None) + .presign_url( + "0", + "1", + &default_version_id(), + ContentDisposition::Inline, + None, + None, + ) .await .unwrap() .unwrap(); @@ -187,7 +201,14 @@ pub(crate) mod tests { let builder = PresignedUrlBuilder::new(&client, &config).set_object_size(Some(2)); let url = builder - .presign_url("0", "1", ContentDisposition::Inline, None, None) + .presign_url( + "0", + "1", + &default_version_id(), + ContentDisposition::Inline, + None, + None, + ) .await .unwrap() .unwrap(); @@ -212,6 +233,7 @@ pub(crate) mod tests { .presign_url( "0", "1", + &default_version_id(), ContentDisposition::Inline, Some("application/json".to_string()), Some("gzip".to_string()), @@ -236,7 +258,14 @@ pub(crate) mod tests { let builder = PresignedUrlBuilder::new(&client, &config).set_object_size(None); let url = builder - .presign_url("0", "1", ContentDisposition::Attachment, None, None) + .presign_url( + "0", + "1", + &default_version_id(), + ContentDisposition::Attachment, + None, + None, + ) .await .unwrap() .unwrap(); @@ -261,7 +290,14 @@ pub(crate) mod tests { let builder = PresignedUrlBuilder::new(&client, &config).set_object_size(Some(2)); let url = builder - .presign_url("0", "1", ContentDisposition::Inline, None, None) + .presign_url( + "0", + "1", + &default_version_id(), + ContentDisposition::Inline, + None, + None, + ) .await .unwrap(); @@ -282,7 +318,14 @@ pub(crate) mod tests { let builder = PresignedUrlBuilder::new(&client, &config).set_object_size(Some(2)); let url = builder - .presign_url("0", "1", ContentDisposition::Inline, None, None) + .presign_url( + "0", + "1", + &default_version_id(), + ContentDisposition::Inline, + None, + None, + ) .await .unwrap() .unwrap();