From 74e2c5cd23070d6803ce1e0dbfb78693d463d1c2 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 25 Sep 2023 07:31:00 -0400 Subject: [PATCH] Make ArrowRowGroupWriter Public and SerializedRowGroupWriter Send (#4850) * changes in supported of async parallel parquet writer * rename ChainReader * cargo fmt --- parquet/src/arrow/arrow_writer/mod.rs | 20 +++++++++++--------- parquet/src/file/writer.rs | 3 ++- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 5417ebe894a3..2e170738f1a8 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -248,7 +248,7 @@ impl RecordBatchWriter for ArrowWriter { /// A list of [`Bytes`] comprising a single column chunk #[derive(Default)] -struct ArrowColumnChunk { +pub struct ArrowColumnChunk { length: usize, data: Vec, } @@ -260,11 +260,13 @@ impl Length for ArrowColumnChunk { } impl ChunkReader for ArrowColumnChunk { - type T = ChainReader; + type T = ArrowColumnChunkReader; fn get_read(&self, start: u64) -> Result { assert_eq!(start, 0); // Assume append_column writes all data in one-shot - Ok(ChainReader(self.data.clone().into_iter().peekable())) + Ok(ArrowColumnChunkReader( + self.data.clone().into_iter().peekable(), + )) } fn get_bytes(&self, _start: u64, _length: usize) -> Result { @@ -273,9 +275,9 @@ impl ChunkReader for ArrowColumnChunk { } /// A [`Read`] for an iterator of [`Bytes`] -struct ChainReader(Peekable>); +pub struct ArrowColumnChunkReader(Peekable>); -impl Read for ChainReader { +impl Read for ArrowColumnChunkReader { fn read(&mut self, out: &mut [u8]) -> std::io::Result { let buffer = loop { match self.0.peek_mut() { @@ -362,14 +364,14 @@ impl ArrowColumnWriter { } /// Encodes [`RecordBatch`] to a parquet row group -struct ArrowRowGroupWriter { +pub struct ArrowRowGroupWriter { writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>, schema: SchemaRef, buffered_rows: usize, } impl ArrowRowGroupWriter { - fn new( + pub fn new( parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, arrow: &SchemaRef, @@ -386,7 +388,7 @@ impl ArrowRowGroupWriter { }) } - fn write(&mut self, batch: &RecordBatch) -> Result<()> { + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.buffered_rows += batch.num_rows(); let mut writers = self.writers.iter_mut().map(|(_, x)| x); for (array, field) in batch.columns().iter().zip(&self.schema.fields) { @@ -396,7 +398,7 @@ impl ArrowRowGroupWriter { Ok(()) } - fn close(self) -> Result> { + pub fn close(self) -> Result> { self.writers .into_iter() .map(|(chunk, writer)| { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index cafb1761352d..859a0aa1f902 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -115,7 +115,8 @@ pub type OnCloseRowGroup<'a> = Box< Vec>, Vec>, ) -> Result<()> - + 'a, + + 'a + + Send, >; // ----------------------------------------------------------------------