Skip to content

Commit

Permalink
Add BufWriter::with_attributes
Browse files Browse the repository at this point in the history
Signed-off-by: netthier <[email protected]>
  • Loading branch information
netthier committed Apr 25, 2024
1 parent 11450ae commit 0fd1427
Showing 1 changed file with 57 additions and 8 deletions.
65 changes: 57 additions & 8 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! Utilities for performing tokio-style buffered IO
use crate::path::Path;
use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart};
use crate::{
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut,
WriteMultipart,
};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand Down Expand Up @@ -217,6 +220,7 @@ impl AsyncBufRead for BufReader {
pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
attributes: Option<Attributes>,
state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
Expand Down Expand Up @@ -252,6 +256,7 @@ impl BufWriter {
capacity,
store,
max_concurrency: 8,
attributes: None,
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
}
}
Expand All @@ -266,6 +271,14 @@ impl BufWriter {
}
}

/// Set the attributes of the uploaded object
pub fn with_attributes(self, attributes: Attributes) -> Self {
Self {
attributes: Some(attributes),
..self
}
}

/// Abort this writer, cleaning up any partially uploaded state
///
/// # Panic
Expand Down Expand Up @@ -306,9 +319,13 @@ impl AsyncWrite for BufWriter {
if b.content_length().saturating_add(buf.len()) >= cap {
let buffer = std::mem::take(b);
let path = std::mem::take(path);
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
..Default::default()
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let upload = store.put_multipart(&path).await?;
let upload = store.put_multipart_opts(&path, opts).await?;
let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
for chunk in buffer.freeze() {
chunked.put(chunk);
Expand Down Expand Up @@ -346,9 +363,13 @@ impl AsyncWrite for BufWriter {
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
let path = std::mem::take(p);
let opts = PutOptions {
attributes: self.attributes.take().unwrap_or_default(),
..Default::default()
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Flush(Box::pin(async move {
store.put(&path, buf.into()).await?;
store.put_opts(&path, buf.into(), opts).await?;
Ok(())
}));
}
Expand Down Expand Up @@ -383,6 +404,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use crate::{Attribute, GetOptions};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

#[tokio::test]
Expand Down Expand Up @@ -468,22 +490,49 @@ mod tests {
async fn test_buf_writer() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");
let attributes = Attributes::from_iter([
(Attribute::ContentType, "text/html"),
(Attribute::CacheControl, "max-age=604800"),
]);

// Test put
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
.with_attributes(attributes.clone());
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
writer.shutdown().await.unwrap();
assert_eq!(store.head(&path).await.unwrap().size, 25);
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 25);
assert_eq!(response.attributes, attributes);

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
.with_attributes(attributes.clone());
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
writer.shutdown().await.unwrap();

assert_eq!(store.head(&path).await.unwrap().size, 40);
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}
}

0 comments on commit 0fd1427

Please sign in to comment.