Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BufWriter for Adapative Put / Multipart Upload #5431

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 161 additions & 2 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Utilities for performing tokio-style buffered IO

use crate::path::Path;
use crate::{ObjectMeta, ObjectStore};
use crate::{MultipartId, ObjectMeta, ObjectStore};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand All @@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};

/// The default buffer size used by [`BufReader`]
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -205,6 +205,138 @@ impl AsyncBufRead for BufReader {
}
}

/// An async buffered writer compatible with the tokio IO traits
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
state: BufWriterState,
multipart_id: Option<MultipartId>,
store: Arc<dyn ObjectStore>,
}

impl std::fmt::Debug for BufWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufWriter")
.field("capacity", &self.capacity)
.field("multipart_id", &self.multipart_id)
.finish()
}
}

type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);

enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, Vec<u8>),
/// [`ObjectStore::put_multipart`]
Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
/// Write to a multipart upload
Write(Box<dyn AsyncWrite + Send + Unpin>),
/// [`ObjectStore::put`]
Put(BoxFuture<'static, std::io::Result<()>>),
}

impl BufWriter {
/// Create a new [`BufWriter`] from the provided [`ObjectStore`] and [`Path`]
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An off-topic question: I remember we planned to support content_type and custom metadata. How can we accommodate this use case in our current API design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, filed #5435

Self::with_capacity(store, path, 10 * 1024 * 1024)
}

/// Create a new [`BufWriter`] from the provided [`ObjectStore`], [`Path`] and `capacity`
pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity: usize) -> Self {
Self {
capacity,
store,
state: BufWriterState::Buffer(path, Vec::with_capacity(1024)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's by design to keep a 1024B buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a habit, as typically you want to avoid small bump allocations when pushing individual records. In this case poll_write is called with a slice of values, so this is not necessary. Will change

multipart_id: None,
}
}

/// Returns the [`MultipartId`] if multipart upload
pub fn multipart_id(&self) -> Option<&MultipartId> {
self.multipart_id.as_ref()
}
}

impl AsyncWrite for BufWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
loop {
return match &mut self.state {
BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
continue;
}
BufWriterState::Buffer(path, b) => {
if b.len().saturating_add(buf.len()) >= cap {
let buffer = std::mem::take(b);
let path = std::mem::take(path);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let (id, mut writer) = store.put_multipart(&path).await?;
writer.write_all(&buffer).await?;
Ok((id, writer))
}));
continue;
}
b.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
};
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
return match &mut self.state {
BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
continue;
}
};
}
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
match &mut self.state {
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
}
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In severe cases, poll_shutdown might return an error. Should we ensure that retrying poll_shutdown functions is safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by safe? What is the issue if poll_shutdown returns an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the confusion caused by my choice of the word "safe". I'm talking about this case:

  • write some data in buffer.
  • calling poll_shutdown, return error like 500 Internal Server.
  • calling poll_shutdown again for retrying.

But the buf has been take in the first call, so the final content could be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #5437 to track this, as I think there is a broader issue here

let path = std::mem::take(p);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Put(Box::pin(async move {
store.put(&path, buf.into()).await?;
Ok(())
}));
}
BufWriterState::Put(f) => return f.poll_unpin(cx),
BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx),
}
}
}
}

/// Port of standardised function as requires Rust 1.66
///
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
Expand Down Expand Up @@ -300,4 +432,31 @@ mod tests {
assert!(buffer.is_empty());
}
}

#[tokio::test]
async fn test_buf_writer() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");

// Test put
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
assert!(writer.multipart_id().is_none());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_none());
assert_eq!(store.head(&path).await.unwrap().size, 25);

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
assert!(writer.multipart_id().is_some());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_some());

assert_eq!(store.head(&path).await.unwrap().size, 40);
}
}
Loading