Skip to content

Commit a680933

Browse files
committed
feat: Make AsyncArrowWriter accepts AsyncFileWriter
Signed-off-by: Xuanwo <[email protected]>
1 parent 68ecc16 commit a680933

File tree

1 file changed

+49
-10
lines changed
  • parquet/src/arrow/async_writer

1 file changed

+49
-10
lines changed

parquet/src/arrow/async_writer/mod.rs

+49-10
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,57 @@ use crate::{
5959
};
6060
use arrow_array::RecordBatch;
6161
use arrow_schema::SchemaRef;
62+
use bytes::Bytes;
63+
use futures::future::BoxFuture;
64+
use futures::FutureExt;
6265
use tokio::io::{AsyncWrite, AsyncWriteExt};
6366

64-
/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`]
67+
/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files
68+
pub trait AsyncFileWriter: Send {
69+
/// Write the provided bytes to the underlying writer
70+
///
71+
/// The underlying writer CAN decide to buffer the data or write it immediately.
72+
/// This design allows the writer implementer to control the buffering and I/O scheduling.
73+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
74+
75+
/// Flush any buffered data to the underlying writer and finish writing process.
76+
///
77+
/// After `complete` returns `Ok(())`, caller SHOULD not call write again.
78+
fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
79+
}
80+
81+
impl AsyncFileWriter for Box<dyn AsyncFileWriter> {
82+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
83+
self.as_mut().write(bs)
84+
}
85+
86+
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
87+
self.as_mut().complete()
88+
}
89+
}
90+
91+
impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
92+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
93+
async move {
94+
self.write_all(&bs).await?;
95+
}
96+
.boxed()
97+
}
98+
99+
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
100+
async move {
101+
self.flush().await?;
102+
self.shutdown().await?;
103+
}
104+
.boxed()
105+
}
106+
}
107+
108+
/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
65109
///
66110
/// ## Memory Usage
67111
///
68-
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`],
112+
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
69113
/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
70114
/// nature of parquet forces data for an entire row group to be buffered in memory, before
71115
/// it can be flushed. Depending on the data and the configured row group size, this buffering
@@ -97,7 +141,7 @@ pub struct AsyncArrowWriter<W> {
97141
async_writer: W,
98142
}
99143

100-
impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
144+
impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
101145
/// Try to create a new Async Arrow Writer
102146
pub fn try_new(
103147
writer: W,
@@ -178,7 +222,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
178222

179223
// Force to flush the remaining data.
180224
self.do_write().await?;
181-
self.async_writer.shutdown().await?;
225+
self.async_writer.complete().await?;
182226

183227
Ok(metadata)
184228
}
@@ -188,12 +232,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
188232
let buffer = self.sync_writer.inner_mut();
189233

190234
self.async_writer
191-
.write_all(buffer.as_slice())
192-
.await
193-
.map_err(|e| ParquetError::External(Box::new(e)))?;
194-
195-
self.async_writer
196-
.flush()
235+
.write(Bytes::from(buffer))
197236
.await
198237
.map_err(|e| ParquetError::External(Box::new(e)))?;
199238

0 commit comments

Comments
 (0)