From 0fd14277760ac1eeabd4a3bf3d9d61fb410c9c61 Mon Sep 17 00:00:00 2001 From: netthier Date: Thu, 25 Apr 2024 16:41:00 +0200 Subject: [PATCH 1/2] Add `BufWriter::with_attributes` Signed-off-by: netthier --- object_store/src/buffered.rs | 65 +++++++++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index d41224177a30..567d6ebc492f 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,10 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart}; +use crate::{ + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, + WriteMultipart, +}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -217,6 +220,7 @@ impl AsyncBufRead for BufReader { pub struct BufWriter { capacity: usize, max_concurrency: usize, + attributes: Option, state: BufWriterState, store: Arc, } @@ -252,6 +256,7 @@ impl BufWriter { capacity, store, max_concurrency: 8, + attributes: None, state: BufWriterState::Buffer(path, PutPayloadMut::new()), } } @@ -266,6 +271,14 @@ impl BufWriter { } } + /// Set the attributes of the uploaded object + pub fn with_attributes(self, attributes: Attributes) -> Self { + Self { + attributes: Some(attributes), + ..self + } + } + /// Abort this writer, cleaning up any partially uploaded state /// /// # Panic @@ -306,9 +319,13 @@ impl AsyncWrite for BufWriter { if b.content_length().saturating_add(buf.len()) >= cap { let buffer = std::mem::take(b); let path = std::mem::take(path); + let opts = PutMultipartOpts { + attributes: self.attributes.take().unwrap_or_default(), + ..Default::default() + }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let upload = store.put_multipart(&path).await?; + let upload = store.put_multipart_opts(&path, opts).await?; let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap); for chunk in buffer.freeze() { chunked.put(chunk); @@ -346,9 +363,13 @@ impl AsyncWrite for BufWriter { BufWriterState::Buffer(p, b) => { let buf = std::mem::take(b); let path = std::mem::take(p); + let opts = PutOptions { + attributes: self.attributes.take().unwrap_or_default(), + ..Default::default() + }; let store = Arc::clone(&self.store); self.state = BufWriterState::Flush(Box::pin(async move { - store.put(&path, buf.into()).await?; + store.put_opts(&path, buf.into(), opts).await?; Ok(()) })); } @@ -383,6 +404,7 @@ mod tests { use super::*; use crate::memory::InMemory; use crate::path::Path; + use crate::{Attribute, GetOptions}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[tokio::test] @@ -468,22 +490,49 @@ mod tests { async fn test_buf_writer() { let store = Arc::new(InMemory::new()) as Arc; let path = Path::from("file.txt"); + let attributes = Attributes::from_iter([ + (Attribute::ContentType, "text/html"), + (Attribute::CacheControl, "max-age=604800"), + ]); // Test put - let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30) + .with_attributes(attributes.clone()); writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 5]).await.unwrap(); writer.shutdown().await.unwrap(); - assert_eq!(store.head(&path).await.unwrap().size, 25); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 25); + assert_eq!(response.attributes, attributes); // Test multipart - let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30) + .with_attributes(attributes.clone()); writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 20]).await.unwrap(); writer.shutdown().await.unwrap(); - - assert_eq!(store.head(&path).await.unwrap().size, 40); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 40); + assert_eq!(response.attributes, attributes); } } From 0c030d7dc339535ca9200eb24ff0db0e77abdb07 Mon Sep 17 00:00:00 2001 From: netthier Date: Thu, 25 Apr 2024 18:58:00 +0200 Subject: [PATCH 2/2] Add `BufWriter::with_tags` Signed-off-by: netthier --- object_store/src/aws/mod.rs | 14 ++++++++++---- object_store/src/azure/mod.rs | 14 ++++++++++---- object_store/src/buffered.rs | 16 ++++++++++++++-- object_store/src/lib.rs | 13 ++++++++++--- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7f1edf12faf8..5bc6d56e7c7a 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -437,10 +437,16 @@ mod tests { // Object tagging is not supported by S3 Express One Zone if config.session_provider.is_none() { - tagging(&integration, !config.disable_tagging, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_object_tagging(&p).await } - }) + tagging( + Arc::new(AmazonS3 { + client: Arc::clone(&integration.client), + }), + !config.disable_tagging, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_object_tagging(&p).await } + }, + ) .await; } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 25ae6dda68a8..755f3b1265fb 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -296,10 +296,16 @@ mod tests { signing(&integration).await; let validate = !integration.client.config().disable_tagging; - tagging(&integration, validate, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_blob_tagging(&p).await } - }) + tagging( + Arc::new(MicrosoftAzure { + client: Arc::clone(&integration.client), + }), + validate, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_blob_tagging(&p).await } + }, + ) .await; // Azurite doesn't support attributes properly diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 567d6ebc492f..feb84d4d0bc3 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -19,7 +19,7 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, WriteMultipart, }; use bytes::Bytes; @@ -221,6 +221,7 @@ pub struct BufWriter { capacity: usize, max_concurrency: usize, attributes: Option, + tags: Option, state: BufWriterState, store: Arc, } @@ -257,6 +258,7 @@ impl BufWriter { store, max_concurrency: 8, attributes: None, + tags: None, state: BufWriterState::Buffer(path, PutPayloadMut::new()), } } @@ -279,6 +281,14 @@ impl BufWriter { } } + /// Set the tags of the uploaded object + pub fn with_tags(self, tags: TagSet) -> Self { + Self { + tags: Some(tags), + ..self + } + } + /// Abort this writer, cleaning up any partially uploaded state /// /// # Panic @@ -321,7 +331,7 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), - ..Default::default() + tags: self.tags.take().unwrap_or_default(), }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { @@ -365,6 +375,7 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(p); let opts = PutOptions { attributes: self.attributes.take().unwrap_or_default(), + tags: self.tags.take().unwrap_or_default(), ..Default::default() }; let store = Arc::clone(&self.store); @@ -486,6 +497,7 @@ mod tests { } } + // Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging` #[tokio::test] async fn test_buf_writer() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ad72bd29ef76..9ec72162edf3 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1314,12 +1314,14 @@ mod test_util { #[cfg(test)] mod tests { use super::*; + use crate::buffered::BufWriter; use crate::multipart::MultipartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; + use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { delete_fixtures(storage).await; @@ -2359,7 +2361,7 @@ mod tests { } #[cfg(any(feature = "aws", feature = "azure"))] - pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) + pub(crate) async fn tagging(storage: Arc, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, Fut: std::future::Future> + Send, @@ -2409,19 +2411,24 @@ mod tests { let multi_path = Path::from("tag_test_multi"); let mut write = storage - .put_multipart_opts(&multi_path, tag_set.into()) + .put_multipart_opts(&multi_path, tag_set.clone().into()) .await .unwrap(); write.put_part("foo".into()).await.unwrap(); write.complete().await.unwrap(); + let buf_path = Path::from("tag_test_buf"); + let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set); + buf.write_all(b"foo").await.unwrap(); + buf.shutdown().await.unwrap(); + // Write should always succeed, but certain configurations may simply ignore tags if !validate { return; } - for path in [path, multi_path] { + for path in [path, multi_path, buf_path] { let resp = get_tags(path.clone()).await.unwrap(); let body = resp.bytes().await.unwrap();