From 08af4710fcd2b56a7624db9b9d97e6715a952cb0 Mon Sep 17 00:00:00 2001 From: nett_hier <66856670+netthier@users.noreply.github.com> Date: Fri, 26 Apr 2024 13:06:52 +0200 Subject: [PATCH] Add `BufWriter::with_attributes` and `::with_tags` in `object_store` (#5693) * Add `BufWriter::with_attributes` Signed-off-by: netthier * Add `BufWriter::with_tags` Signed-off-by: netthier --------- Signed-off-by: netthier --- object_store/src/aws/mod.rs | 14 +++++-- object_store/src/azure/mod.rs | 14 +++++-- object_store/src/buffered.rs | 77 +++++++++++++++++++++++++++++++---- object_store/src/lib.rs | 13 ++++-- 4 files changed, 99 insertions(+), 19 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7f1edf12faf8..5bc6d56e7c7a 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -437,10 +437,16 @@ mod tests { // Object tagging is not supported by S3 Express One Zone if config.session_provider.is_none() { - tagging(&integration, !config.disable_tagging, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_object_tagging(&p).await } - }) + tagging( + Arc::new(AmazonS3 { + client: Arc::clone(&integration.client), + }), + !config.disable_tagging, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_object_tagging(&p).await } + }, + ) .await; } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 25ae6dda68a8..755f3b1265fb 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -296,10 +296,16 @@ mod tests { signing(&integration).await; let validate = !integration.client.config().disable_tagging; - tagging(&integration, validate, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_blob_tagging(&p).await } - }) + tagging( + Arc::new(MicrosoftAzure { + client: Arc::clone(&integration.client), + }), + validate, + |p| { + let client = Arc::clone(&integration.client); + async move { client.get_blob_tagging(&p).await } + }, + ) .await; // Azurite doesn't support attributes properly diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index d41224177a30..feb84d4d0bc3 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,10 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart}; +use crate::{ + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, + WriteMultipart, +}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -217,6 +220,8 @@ impl AsyncBufRead for BufReader { pub struct BufWriter { capacity: usize, max_concurrency: usize, + attributes: Option, + tags: Option, state: BufWriterState, store: Arc, } @@ -252,6 +257,8 @@ impl BufWriter { capacity, store, max_concurrency: 8, + attributes: None, + tags: None, state: BufWriterState::Buffer(path, PutPayloadMut::new()), } } @@ -266,6 +273,22 @@ impl BufWriter { } } + /// Set the attributes of the uploaded object + pub fn with_attributes(self, attributes: Attributes) -> Self { + Self { + attributes: Some(attributes), + ..self + } + } + + /// Set the tags of the uploaded object + pub fn with_tags(self, tags: TagSet) -> Self { + Self { + tags: Some(tags), + ..self + } + } + /// Abort this writer, cleaning up any partially uploaded state /// /// # Panic @@ -306,9 +329,13 @@ impl AsyncWrite for BufWriter { if b.content_length().saturating_add(buf.len()) >= cap { let buffer = std::mem::take(b); let path = std::mem::take(path); + let opts = PutMultipartOpts { + attributes: self.attributes.take().unwrap_or_default(), + tags: self.tags.take().unwrap_or_default(), + }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let upload = store.put_multipart(&path).await?; + let upload = store.put_multipart_opts(&path, opts).await?; let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap); for chunk in buffer.freeze() { chunked.put(chunk); @@ -346,9 +373,14 @@ impl AsyncWrite for BufWriter { BufWriterState::Buffer(p, b) => { let buf = std::mem::take(b); let path = std::mem::take(p); + let opts = PutOptions { + attributes: self.attributes.take().unwrap_or_default(), + tags: self.tags.take().unwrap_or_default(), + ..Default::default() + }; let store = Arc::clone(&self.store); self.state = BufWriterState::Flush(Box::pin(async move { - store.put(&path, buf.into()).await?; + store.put_opts(&path, buf.into(), opts).await?; Ok(()) })); } @@ -383,6 +415,7 @@ mod tests { use super::*; use crate::memory::InMemory; use crate::path::Path; + use crate::{Attribute, GetOptions}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[tokio::test] @@ -464,26 +497,54 @@ mod tests { } } + // Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging` #[tokio::test] async fn test_buf_writer() { let store = Arc::new(InMemory::new()) as Arc; let path = Path::from("file.txt"); + let attributes = Attributes::from_iter([ + (Attribute::ContentType, "text/html"), + (Attribute::CacheControl, "max-age=604800"), + ]); // Test put - let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30) + .with_attributes(attributes.clone()); writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 5]).await.unwrap(); writer.shutdown().await.unwrap(); - assert_eq!(store.head(&path).await.unwrap().size, 25); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 25); + assert_eq!(response.attributes, attributes); // Test multipart - let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30) + .with_attributes(attributes.clone()); writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 20]).await.unwrap(); writer.shutdown().await.unwrap(); - - assert_eq!(store.head(&path).await.unwrap().size, 40); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 40); + assert_eq!(response.attributes, attributes); } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index c99e15a49333..9a8f77b4d824 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1314,12 +1314,14 @@ mod test_util { #[cfg(test)] mod tests { use super::*; + use crate::buffered::BufWriter; use crate::multipart::MultipartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; + use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { delete_fixtures(storage).await; @@ -2365,7 +2367,7 @@ mod tests { } #[cfg(any(feature = "aws", feature = "azure"))] - pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) + pub(crate) async fn tagging(storage: Arc, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, Fut: std::future::Future> + Send, @@ -2415,19 +2417,24 @@ mod tests { let multi_path = Path::from("tag_test_multi"); let mut write = storage - .put_multipart_opts(&multi_path, tag_set.into()) + .put_multipart_opts(&multi_path, tag_set.clone().into()) .await .unwrap(); write.put_part("foo".into()).await.unwrap(); write.complete().await.unwrap(); + let buf_path = Path::from("tag_test_buf"); + let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set); + buf.write_all(b"foo").await.unwrap(); + buf.shutdown().await.unwrap(); + // Write should always succeed, but certain configurations may simply ignore tags if !validate { return; } - for path in [path, multi_path] { + for path in [path, multi_path, buf_path] { let resp = get_tags(path.clone()).await.unwrap(); let body = resp.bytes().await.unwrap();