diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7f1edf12faf8..36b08f3f5c9b 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -437,10 +437,16 @@ mod tests { // Object tagging is not supported by S3 Express One Zone if config.session_provider.is_none() { - tagging(&integration, !config.disable_tagging, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_object_tagging(&p).await } - }) + tagging( + Arc::new(AmazonS3 { + client: Arc::clone(&integration.client), + }), + !config.disable_tagging, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_object_tagging(&p).await } + }, + ) .await; } @@ -448,7 +454,7 @@ mod tests { copy_if_not_exists(&integration).await; } if test_conditional_put { - put_opts(&integration, true).await; + put_opts(&integration as _, true).await; } // run integration test with unsigned payload enabled diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 25ae6dda68a8..755f3b1265fb 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -296,10 +296,16 @@ mod tests { signing(&integration).await; let validate = !integration.client.config().disable_tagging; - tagging(&integration, validate, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_blob_tagging(&p).await } - }) + tagging( + Arc::new(MicrosoftAzure { + client: Arc::clone(&integration.client), + }), + validate, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_blob_tagging(&p).await } + }, + ) .await; // Azurite doesn't support attributes properly diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 567d6ebc492f..feb84d4d0bc3 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -19,7 +19,7 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, WriteMultipart, }; use bytes::Bytes; @@ -221,6 +221,7 @@ pub struct BufWriter { capacity: usize, max_concurrency: usize, attributes: Option, + tags: Option, state: BufWriterState, store: Arc, } @@ -257,6 +258,7 @@ impl BufWriter { store, max_concurrency: 8, attributes: None, + tags: None, state: BufWriterState::Buffer(path, PutPayloadMut::new()), } } @@ -279,6 +281,14 @@ impl BufWriter { } } + /// Set the tags of the uploaded object + pub fn with_tags(self, tags: TagSet) -> Self { + Self { + tags: Some(tags), + ..self + } + } + /// Abort this writer, cleaning up any partially uploaded state /// /// # Panic @@ -321,7 +331,7 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), - ..Default::default() + tags: self.tags.take().unwrap_or_default(), }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { @@ -365,6 +375,7 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(p); let opts = PutOptions { attributes: self.attributes.take().unwrap_or_default(), + tags: self.tags.take().unwrap_or_default(), ..Default::default() }; let store = Arc::clone(&self.store); @@ -486,6 +497,7 @@ mod tests { } } + // Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging` #[tokio::test] async fn test_buf_writer() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ad72bd29ef76..9ec72162edf3 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1314,12 +1314,14 @@ mod test_util { #[cfg(test)] mod tests { use super::*; + use crate::buffered::BufWriter; use crate::multipart::MultipartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; + use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { delete_fixtures(storage).await; @@ -2359,7 +2361,7 @@ mod tests { } #[cfg(any(feature = "aws", feature = "azure"))] - pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) + pub(crate) async fn tagging(storage: Arc, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, Fut: std::future::Future> + Send, @@ -2409,19 +2411,24 @@ mod tests { let multi_path = Path::from("tag_test_multi"); let mut write = storage - .put_multipart_opts(&multi_path, tag_set.into()) + .put_multipart_opts(&multi_path, tag_set.clone().into()) .await .unwrap(); write.put_part("foo".into()).await.unwrap(); write.complete().await.unwrap(); + let buf_path = Path::from("tag_test_buf"); + let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set); + buf.write_all(b"foo").await.unwrap(); + buf.shutdown().await.unwrap(); + // Write should always succeed, but certain configurations may simply ignore tags if !validate { return; } - for path in [path, multi_path] { + for path in [path, multi_path, buf_path] { let resp = get_tags(path.clone()).await.unwrap(); let body = resp.bytes().await.unwrap();