Skip to content

Commit

Permalink
Make ArrowRowGroupWriter Public and SerializedRowGroupWriter Send (#4850
Browse files Browse the repository at this point in the history
)

* changes in supported of async parallel parquet writer

* rename ChainReader

* cargo fmt
  • Loading branch information
devinjdangelo authored Sep 25, 2023
1 parent 7e7ac15 commit 74e2c5c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
20 changes: 11 additions & 9 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {

/// A list of [`Bytes`] comprising a single column chunk
#[derive(Default)]
struct ArrowColumnChunk {
pub struct ArrowColumnChunk {
length: usize,
data: Vec<Bytes>,
}
Expand All @@ -260,11 +260,13 @@ impl Length for ArrowColumnChunk {
}

impl ChunkReader for ArrowColumnChunk {
type T = ChainReader;
type T = ArrowColumnChunkReader;

fn get_read(&self, start: u64) -> Result<Self::T> {
assert_eq!(start, 0); // Assume append_column writes all data in one-shot
Ok(ChainReader(self.data.clone().into_iter().peekable()))
Ok(ArrowColumnChunkReader(
self.data.clone().into_iter().peekable(),
))
}

fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
Expand All @@ -273,9 +275,9 @@ impl ChunkReader for ArrowColumnChunk {
}

/// A [`Read`] for an iterator of [`Bytes`]
struct ChainReader(Peekable<IntoIter<Bytes>>);
pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);

impl Read for ChainReader {
impl Read for ArrowColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
let buffer = loop {
match self.0.peek_mut() {
Expand Down Expand Up @@ -362,14 +364,14 @@ impl ArrowColumnWriter {
}

/// Encodes [`RecordBatch`] to a parquet row group
struct ArrowRowGroupWriter {
pub struct ArrowRowGroupWriter {
writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
schema: SchemaRef,
buffered_rows: usize,
}

impl ArrowRowGroupWriter {
fn new(
pub fn new(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
Expand All @@ -386,7 +388,7 @@ impl ArrowRowGroupWriter {
})
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut().map(|(_, x)| x);
for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
Expand All @@ -396,7 +398,7 @@ impl ArrowRowGroupWriter {
Ok(())
}

fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
self.writers
.into_iter()
.map(|(chunk, writer)| {
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ pub type OnCloseRowGroup<'a> = Box<
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
) -> Result<()>
+ 'a,
+ 'a
+ Send,
>;

// ----------------------------------------------------------------------
Expand Down

0 comments on commit 74e2c5c

Please sign in to comment.