diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index d95c2007c378..60120f110000 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -59,13 +59,59 @@ use crate::{ }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; use tokio::io::{AsyncWrite, AsyncWriteExt}; -/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`] +/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files +pub trait AsyncFileWriter: Send { + /// Write the provided bytes to the underlying writer + /// + /// The underlying writer CAN decide to buffer the data or write it immediately. + /// This design allows the writer implementer to control the buffering and I/O scheduling. + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>; + + /// Flush any buffered data to the underlying writer and finish writing process. + /// + /// After `complete` returns `Ok(())`, caller SHOULD not call write again. + fn complete(&mut self) -> BoxFuture<'_, Result<()>>; +} + +impl AsyncFileWriter for Box { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + self.as_mut().write(bs) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + self.as_mut().complete() + } +} + +impl AsyncFileWriter for T { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + async move { + self.write_all(&bs).await?; + Ok(()) + } + .boxed() + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + async move { + self.flush().await?; + self.shutdown().await?; + Ok(()) + } + .boxed() + } +} + +/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`] /// /// ## Memory Usage /// -/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`], +/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`], /// permitting fine-grained control over buffering and I/O scheduling. However, the columnar /// nature of parquet forces data for an entire row group to be buffered in memory, before /// it can be flushed. Depending on the data and the configured row group size, this buffering @@ -97,7 +143,7 @@ pub struct AsyncArrowWriter { async_writer: W, } -impl AsyncArrowWriter { +impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer pub fn try_new( writer: W, @@ -178,7 +224,7 @@ impl AsyncArrowWriter { // Force to flush the remaining data. self.do_write().await?; - self.async_writer.shutdown().await?; + self.async_writer.complete().await?; Ok(metadata) } @@ -188,12 +234,7 @@ impl AsyncArrowWriter { let buffer = self.sync_writer.inner_mut(); self.async_writer - .write_all(buffer.as_slice()) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?; - - self.async_writer - .flush() + .write(Bytes::copy_from_slice(buffer)) .await .map_err(|e| ParquetError::External(Box::new(e)))?;