From 8bada265de51248c05b7282cbbc9635cfb1dc662 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 16 Oct 2023 18:20:32 +0100 Subject: [PATCH] Assume records not split across pages (#4921) --- parquet/src/arrow/array_reader/mod.rs | 2 +- parquet/src/arrow/async_reader/mod.rs | 90 ++++++++++++++++++++++++++- parquet/src/column/reader.rs | 17 +---- parquet/src/column/reader/decoder.rs | 1 + 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 625ac034ef47..a4ee5040590e 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -152,7 +152,7 @@ where Ok(records_read) } -/// Uses `record_reader` to skip up to `batch_size` records from`pages` +/// Uses `record_reader` to skip up to `batch_size` records from `pages` /// /// Returns the number of records skipped, which can be less than `batch_size` if /// pages is exhausted diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 4b3eebf2e67e..df464400af87 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -878,12 +878,17 @@ mod tests { use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; use arrow::error::Result as ArrowResult; + use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; - use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray}; - use futures::TryStreamExt; + use arrow_array::{ + Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array, + }; + use arrow_schema::{DataType, Field, Schema}; + use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; use std::sync::Mutex; + use tempfile::tempfile; #[derive(Clone)] struct TestReader { @@ -1677,4 +1682,85 @@ mod tests { assert!(sbbf.check(&"Hello")); assert!(!sbbf.check(&"Hello_Not_Exists")); } + + #[tokio::test] + async fn test_nested_skip() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col_1", DataType::UInt64, false), + Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true), + ])); + + // Default writer properties + let props = WriterProperties::builder() + .set_data_page_row_count_limit(256) + .set_write_batch_size(256) + .set_max_row_group_size(1024); + + // Write data + let mut file = tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap(); + + let mut builder = ListBuilder::new(StringBuilder::new()); + for id in 0..1024 { + match id % 3 { + 0 => builder + .append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]), + 1 => builder.append_value([Some(format!("id_{id}"))]), + _ => builder.append_null(), + } + } + let refs = vec![ + Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef, + Arc::new(builder.finish()) as ArrayRef, + ]; + + let batch = RecordBatch::try_new(schema.clone(), refs).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let selections = [ + RowSelection::from(vec![ + RowSelector::skip(313), + RowSelector::select(1), + RowSelector::skip(709), + RowSelector::select(1), + ]), + RowSelection::from(vec![ + RowSelector::skip(255), + RowSelector::select(1), + RowSelector::skip(767), + RowSelector::select(1), + ]), + RowSelection::from(vec![ + RowSelector::skip(254), + RowSelector::select(1), + RowSelector::select(1), + RowSelector::skip(767), + RowSelector::select(1), + ]), + ]; + + for selection in selections { + let expected = selection.row_count(); + // Read data + let mut reader = ParquetRecordBatchStreamBuilder::new_with_options( + tokio::fs::File::from_std(file.try_clone().unwrap()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + + reader = reader.with_row_selection(selection); + + let mut stream = reader.build().unwrap(); + + let mut total_rows = 0; + while let Some(rb) = stream.next().await { + let rb = rb.unwrap(); + total_rows += rb.num_rows(); + } + assert_eq!(total_rows, expected); + } + } } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 3ce00622e953..173b342d9ff2 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -124,9 +124,6 @@ pub struct GenericColumnReader { /// so far. num_decoded_values: usize, - /// True if the end of the current data page denotes the end of a record - has_record_delimiter: bool, - /// The decoder for the definition levels if any def_level_decoder: Option, @@ -182,7 +179,6 @@ where num_buffered_values: 0, num_decoded_values: 0, values_decoder, - has_record_delimiter: false, } } @@ -265,7 +261,7 @@ where remaining_records, )?; - if levels_read == remaining_levels && self.has_record_delimiter { + if levels_read == remaining_levels { // Reached end of page, which implies records_read < remaining_records // as otherwise would have stopped reading before reaching the end assert!(records_read < remaining_records); // Sanity check @@ -376,7 +372,7 @@ where let (mut records_read, levels_read) = decoder.skip_rep_levels(remaining_records, remaining_levels)?; - if levels_read == remaining_levels && self.has_record_delimiter { + if levels_read == remaining_levels { // Reached end of page, which implies records_read < remaining_records // as otherwise would have stopped reading before reaching the end assert!(records_read < remaining_records); // Sanity check @@ -490,9 +486,6 @@ where )?; offset += bytes_read; - self.has_record_delimiter = - self.page_reader.peek_next_page()?.is_none(); - self.rep_level_decoder .as_mut() .unwrap() @@ -544,12 +537,6 @@ where // DataPage v2 only supports RLE encoding for repetition // levels if self.descr.max_rep_level() > 0 { - // Technically a DataPage v2 should not write a record - // across multiple pages, however, the parquet writer - // used to do this so we preserve backwards compatibility - self.has_record_delimiter = - self.page_reader.peek_next_page()?.is_none(); - self.rep_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, buf.range(0, rep_levels_byte_len as usize), diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 369b335dc98f..0385f9ea2063 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -451,6 +451,7 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)); self.buffer_len = 0; self.buffer_offset = 0; + self.has_partial = false; } }