Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BufWriter::with_attributes and ::with_tags in object_store #5693

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,16 @@ mod tests {

// Object tagging is not supported by S3 Express One Zone
if config.session_provider.is_none() {
tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
tagging(
netthier marked this conversation as resolved.
Show resolved Hide resolved
Arc::new(AmazonS3 {
client: Arc::clone(&integration.client),
}),
!config.disable_tagging,
|p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
},
)
.await;
}

Expand Down
14 changes: 10 additions & 4 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,16 @@ mod tests {
signing(&integration).await;

let validate = !integration.client.config().disable_tagging;
tagging(&integration, validate, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
tagging(
Arc::new(MicrosoftAzure {
client: Arc::clone(&integration.client),
}),
validate,
|p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
},
)
.await;

// Azurite doesn't support attributes properly
Expand Down
77 changes: 69 additions & 8 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! Utilities for performing tokio-style buffered IO

use crate::path::Path;
use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart};
use crate::{
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
WriteMultipart,
};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand Down Expand Up @@ -217,6 +220,8 @@ impl AsyncBufRead for BufReader {
pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
attributes: Option<Attributes>,
tags: Option<TagSet>,
state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
Expand Down Expand Up @@ -252,6 +257,8 @@ impl BufWriter {
capacity,
store,
max_concurrency: 8,
attributes: None,
tags: None,
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
}
}
Expand All @@ -266,6 +273,22 @@ impl BufWriter {
}
}

/// Set the attributes of the uploaded object
pub fn with_attributes(self, attributes: Attributes) -> Self {
Self {
attributes: Some(attributes),
..self
}
}

/// Set the tags of the uploaded object
pub fn with_tags(self, tags: TagSet) -> Self {
Self {
tags: Some(tags),
..self
}
}

/// Abort this writer, cleaning up any partially uploaded state
///
/// # Panic
Expand Down Expand Up @@ -306,9 +329,13 @@ impl AsyncWrite for BufWriter {
if b.content_length().saturating_add(buf.len()) >= cap {
let buffer = std::mem::take(b);
let path = std::mem::take(path);
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let upload = store.put_multipart(&path).await?;
let upload = store.put_multipart_opts(&path, opts).await?;
let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
for chunk in buffer.freeze() {
chunked.put(chunk);
Expand Down Expand Up @@ -346,9 +373,14 @@ impl AsyncWrite for BufWriter {
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
let path = std::mem::take(p);
let opts = PutOptions {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
..Default::default()
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Flush(Box::pin(async move {
store.put(&path, buf.into()).await?;
store.put_opts(&path, buf.into(), opts).await?;
Ok(())
}));
}
Expand Down Expand Up @@ -383,6 +415,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use crate::{Attribute, GetOptions};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

#[tokio::test]
Expand Down Expand Up @@ -464,26 +497,54 @@ mod tests {
}
}

// Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging`
#[tokio::test]
async fn test_buf_writer() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");
let attributes = Attributes::from_iter([
(Attribute::ContentType, "text/html"),
(Attribute::CacheControl, "max-age=604800"),
]);

// Test put
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
.with_attributes(attributes.clone());
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
writer.shutdown().await.unwrap();
assert_eq!(store.head(&path).await.unwrap().size, 25);
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 25);
assert_eq!(response.attributes, attributes);

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
.with_attributes(attributes.clone());
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
writer.shutdown().await.unwrap();

assert_eq!(store.head(&path).await.unwrap().size, 40);
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}
}
13 changes: 10 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1314,12 +1314,14 @@ mod test_util {
#[cfg(test)]
mod tests {
use super::*;
use crate::buffered::BufWriter;
use crate::multipart::MultipartStore;
use crate::test_util::flatten_list_stream;
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use tokio::io::AsyncWriteExt;

pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
delete_fixtures(storage).await;
Expand Down Expand Up @@ -2359,7 +2361,7 @@ mod tests {
}

#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: bool, get_tags: F)
pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
Expand Down Expand Up @@ -2409,19 +2411,24 @@ mod tests {

let multi_path = Path::from("tag_test_multi");
let mut write = storage
.put_multipart_opts(&multi_path, tag_set.into())
.put_multipart_opts(&multi_path, tag_set.clone().into())
.await
.unwrap();

write.put_part("foo".into()).await.unwrap();
write.complete().await.unwrap();

let buf_path = Path::from("tag_test_buf");
let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
buf.write_all(b"foo").await.unwrap();
buf.shutdown().await.unwrap();

// Write should always succeed, but certain configurations may simply ignore tags
if !validate {
return;
}

for path in [path, multi_path] {
for path in [path, multi_path, buf_path] {
let resp = get_tags(path.clone()).await.unwrap();
let body = resp.bytes().await.unwrap();

Expand Down
Loading