Skip to content

Commit

Permalink
Implement a lazy multipart writer
Browse files Browse the repository at this point in the history
Signed-off-by: 🐼 Samrose Ahmed 🐼 <[email protected]>
  • Loading branch information
Samrose-Ahmed committed Dec 12, 2023
1 parent 8aa55dd commit 3363bb2
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 0 deletions.
5 changes: 5 additions & 0 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ impl MultiPartStore for AmazonS3 {
self.client.put_part(path, id, part_idx, data).await
}

async fn put_object(&self, path: &Path, data: Bytes) -> Result<()> {
self.client.put_request(path, data).send().await?;
Ok(())
}

async fn complete_multipart(
&self,
path: &Path,
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ impl MultiPartStore for MicrosoftAzure {
self.client.put_block(path, part_idx, data).await
}

async fn put_object(&self, path: &Path, data: Bytes) -> Result<()> {
self.client
.put_blob(path, data, crate::PutMode::Overwrite.into())
.await?;
Ok(())
}

async fn complete_multipart(
&self,
path: &Path,
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ impl MultiPartStore for GoogleCloudStorage {
self.client.put_part(path, id, part_idx, data).await
}

async fn put_object(&self, path: &Path, data: Bytes) -> Result<()> {
self.client
.put(path, data, crate::PutMode::Overwrite.into())
.await?;
Ok(())
}

async fn complete_multipart(
&self,
path: &Path,
Expand Down
273 changes: 273 additions & 0 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ pub trait MultiPartStore: Send + Sync + 'static {
data: Bytes,
) -> Result<PartId>;

/// Uploads a complete object without multipart upload.
async fn put_object(&self, path: &Path, data: Bytes) -> Result<()>;

/// Completes a multipart upload
///
/// The `i`'th value of `parts` must be a [`PartId`] returned by a call to [`Self::put_part`]
Expand All @@ -316,3 +319,273 @@ pub trait MultiPartStore: Send + Sync + 'static {
/// Aborts a multipart upload
async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()>;
}

/// Wrapper around a [`MultiPartStore`] that implements [`AsyncWrite`]
///
/// Data will be uploaded in fixed size chunks of 10 MiB in parallel,
/// up to the configured maximum concurrency
/// The multipart upload will only be created when more than the 10 MiB is written,
/// otherwise the data will be uploaded directly as PUT.
pub struct LazyWriteMultiPart {
path: Path,
inner: Arc<dyn MultiPartStore>,
/// A list of completed parts, in sequential order.
completed_parts: Vec<Option<PartId>>,
/// Part upload tasks currently running
tasks: FuturesUnordered<BoxedTryFuture<(usize, PartId)>>,
/// Maximum number of upload tasks to run concurrently
max_concurrency: usize,
/// Buffer that will be sent in next upload.
current_buffer: Vec<u8>,
/// Size of each part.
///
/// While S3 and Minio support variable part sizes, R2 requires they all be
/// exactly the same size.
part_size: usize,
/// Index of current part
current_part_idx: usize,
/// The completion task
completion_task: Option<BoxedTryFuture<()>>,
/// the create task
create_task: Option<BoxedTryFuture<MultipartId>>,
upload_id: Option<MultipartId>,
}

impl std::fmt::Debug for LazyWriteMultiPart {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazyWriteMultiPart")
.field("path", &self.path)
.field("completed_parts", &self.completed_parts)
.field("tasks", &self.tasks)
.field("max_concurrency", &self.max_concurrency)
.field("current_buffer", &self.current_buffer)
.field("part_size", &self.part_size)
.field("current_part_idx", &self.current_part_idx)
.field("upload_id", &self.upload_id)
.finish()
}
}

/// Convenience function to create a default lazy multipart writer from a [`MultiPartStore`].
pub fn put_multipart_lazy(
store: Arc<dyn MultiPartStore>,
path: Path,
) -> Box<dyn AsyncWrite + Send + Unpin> {
Box::new(LazyWriteMultiPart::new(store, path, 10))
}

impl LazyWriteMultiPart {
/// Create a new multipart upload with the implementation and the given maximum concurrency
pub fn new(inner: Arc<dyn MultiPartStore>, path: Path, max_concurrency: usize) -> Self {
Self {
inner,
path,
completed_parts: Vec::new(),
tasks: FuturesUnordered::new(),
max_concurrency,
current_buffer: Vec::new(),
// TODO: Should self vary by provider?
// TODO: Should we automatically increase then when part index gets large?

// Minimum size of 5 MiB
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// https://cloud.google.com/storage/quotas#requests
part_size: 10 * 1024 * 1024,
current_part_idx: 0,
completion_task: None,
create_task: None,
upload_id: None,
}
}

// Add data to the current buffer, returning the number of bytes added
fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> usize {
let remaining_capacity = self.part_size - self.current_buffer.len();
let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset);
self.current_buffer
.extend_from_slice(&buf[offset..offset + to_copy]);
to_copy
}

/// Poll current tasks
fn poll_tasks(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Result<(), io::Error> {
if self.tasks.is_empty() {
return Ok(());
}
while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
let (part_idx, part) = res?;
let total_parts = self.completed_parts.len();
self.completed_parts
.resize(std::cmp::max(part_idx + 1, total_parts), None);
self.completed_parts[part_idx] = Some(part);
}
Ok(())
}

// The `poll_flush` function will only flush the in-progress tasks.
// The `final_flush` method called during `poll_shutdown` will flush
// the `current_buffer` along with in-progress tasks.
// Please see https://github.com/apache/arrow-rs/issues/3390 for more details.
fn final_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
if self.upload_id.is_none() {
return Poll::Ready(Ok(()));
}
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

// If current_buffer is not empty, see if it can be submitted
if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
let inner = Arc::clone(&self.inner);
let part_idx = self.current_part_idx;
let path = self.path.clone();
let upload_id = self.upload_id.clone().unwrap();
self.tasks.push(Box::pin(async move {
let upload_part = inner
.put_part(&path, &upload_id, part_idx, out_buffer.into())
.await?;
Ok((part_idx, upload_part))
}));
}

self.as_mut().poll_tasks(cx)?;

// If tasks and current_buffer are empty, return Ready
if self.tasks.is_empty() && self.current_buffer.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}

impl AsyncWrite for LazyWriteMultiPart {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

let mut offset = 0;

loop {
// Fill up current buffer
offset += self.as_mut().add_to_buffer(buf, offset);

// If we don't have a full buffer or we have too many tasks, break
if self.current_buffer.len() < self.part_size
|| self.tasks.len() >= self.max_concurrency
{
break;
}

if self.upload_id.is_none() {
let inner = Arc::clone(&self.inner);
let path = self.path.clone();
let create_task = self.create_task.get_or_insert_with(|| {
Box::pin(async move {
let upload_id = inner.create_multipart(&path).await?;
Ok(upload_id)
})
});
match Pin::new(create_task).poll(cx)? {
Poll::Ready(id) => {
self.upload_id = Some(id);
}
Poll::Pending => return Poll::Pending,
}
};

let new_buffer = Vec::with_capacity(self.part_size);
let out_buffer = std::mem::replace(&mut self.current_buffer, new_buffer);
let inner = Arc::clone(&self.inner);
let part_idx = self.current_part_idx;
let upload_id = self.upload_id.as_ref().unwrap().clone();
let path = self.path.clone();
self.tasks.push(Box::pin(async move {
let upload_part = inner
.put_part(&path, &upload_id, part_idx, out_buffer.into())
.await?;
Ok((part_idx, upload_part))
}));
self.current_part_idx += 1;

// We need to poll immediately after adding to setup waker
self.as_mut().poll_tasks(cx)?;
}

// If offset is zero, then we didn't write anything because we didn't
// have capacity for more tasks and our buffer is full.
if offset == 0 && !buf.is_empty() {
Poll::Pending
} else {
Poll::Ready(Ok(offset))
}
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

// If tasks is empty, return Ready
if self.tasks.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
// First, poll flush
match self.as_mut().final_flush(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => res?,
};

// If shutdown task is not set, set it
let parts = std::mem::take(&mut self.completed_parts);
let parts = parts
.into_iter()
.enumerate()
.map(|(idx, part)| {
part.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("Missing information for upload part {idx}"),
)
})
})
.collect::<Result<_, _>>()?;

let inner = Arc::clone(&self.inner);
let upload_id = self.upload_id.clone();
let path = self.path.clone();
let buffer = std::mem::take(&mut self.current_buffer);
let completion_task = self.completion_task.get_or_insert_with(|| {
Box::pin(async move {
if let Some(upload_id) = upload_id {
inner.complete_multipart(&path, &upload_id, parts).await?;
} else {
inner.put_object(&path, buffer.into()).await?;
}
Ok(())
})
});

Pin::new(completion_task).poll(cx)
}
}

0 comments on commit 3363bb2

Please sign in to comment.