Skip to content

Commit

Permalink
Add BufWriter::with_tags
Browse files Browse the repository at this point in the history
Signed-off-by: netthier <[email protected]>
  • Loading branch information
netthier committed Apr 25, 2024
1 parent 0fd1427 commit 6cdb175
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
16 changes: 11 additions & 5 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,24 @@ mod tests {

// Object tagging is not supported by S3 Express One Zone
if config.session_provider.is_none() {
tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
tagging(
Arc::new(AmazonS3 {
client: Arc::clone(&integration.client),
}),
!config.disable_tagging,
|p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
},
)
.await;
}

if test_not_exists {
copy_if_not_exists(&integration).await;
}
if test_conditional_put {
put_opts(&integration, true).await;
put_opts(&integration as _, true).await;
}

// run integration test with unsigned payload enabled
Expand Down
14 changes: 10 additions & 4 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,16 @@ mod tests {
signing(&integration).await;

let validate = !integration.client.config().disable_tagging;
tagging(&integration, validate, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
tagging(
Arc::new(MicrosoftAzure {
client: Arc::clone(&integration.client),
}),
validate,
|p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
},
)
.await;

// Azurite doesn't support attributes properly
Expand Down
16 changes: 14 additions & 2 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::path::Path;
use crate::{
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut,
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
WriteMultipart,
};
use bytes::Bytes;
Expand Down Expand Up @@ -221,6 +221,7 @@ pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
attributes: Option<Attributes>,
tags: Option<TagSet>,
state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
Expand Down Expand Up @@ -257,6 +258,7 @@ impl BufWriter {
store,
max_concurrency: 8,
attributes: None,
tags: None,
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
}
}
Expand All @@ -279,6 +281,14 @@ impl BufWriter {
}
}

/// Set the tags of the uploaded object
pub fn with_tags(self, tags: TagSet) -> Self {
Self {
tags: Some(tags),
..self
}
}

/// Abort this writer, cleaning up any partially uploaded state
///
/// # Panic
Expand Down Expand Up @@ -321,7 +331,7 @@ impl AsyncWrite for BufWriter {
let path = std::mem::take(path);
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
..Default::default()
tags: self.tags.take().unwrap_or_default(),
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
Expand Down Expand Up @@ -365,6 +375,7 @@ impl AsyncWrite for BufWriter {
let path = std::mem::take(p);
let opts = PutOptions {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
..Default::default()
};
let store = Arc::clone(&self.store);
Expand Down Expand Up @@ -486,6 +497,7 @@ mod tests {
}
}

// Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging`
#[tokio::test]
async fn test_buf_writer() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
Expand Down
13 changes: 10 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1314,12 +1314,14 @@ mod test_util {
#[cfg(test)]
mod tests {
use super::*;
use crate::buffered::BufWriter;
use crate::multipart::MultipartStore;
use crate::test_util::flatten_list_stream;
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use tokio::io::AsyncWriteExt;

pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
delete_fixtures(storage).await;
Expand Down Expand Up @@ -2359,7 +2361,7 @@ mod tests {
}

#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: bool, get_tags: F)
pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
Expand Down Expand Up @@ -2409,19 +2411,24 @@ mod tests {

let multi_path = Path::from("tag_test_multi");
let mut write = storage
.put_multipart_opts(&multi_path, tag_set.into())
.put_multipart_opts(&multi_path, tag_set.clone().into())
.await
.unwrap();

write.put_part("foo".into()).await.unwrap();
write.complete().await.unwrap();

let buf_path = Path::from("tag_test_buf");
let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
buf.write_all(b"foo").await.unwrap();
buf.shutdown().await.unwrap();

// Write should always succeed, but certain configurations may simply ignore tags
if !validate {
return;
}

for path in [path, multi_path] {
for path in [path, multi_path, buf_path] {
let resp = get_tags(path.clone()).await.unwrap();
let body = resp.bytes().await.unwrap();

Expand Down

0 comments on commit 6cdb175

Please sign in to comment.