diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 625ac034ef47..a4ee5040590e 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -152,7 +152,7 @@ where Ok(records_read) } -/// Uses `record_reader` to skip up to `batch_size` records from`pages` +/// Uses `record_reader` to skip up to `batch_size` records from `pages` /// /// Returns the number of records skipped, which can be less than `batch_size` if /// pages is exhausted diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 4b3eebf2e67e..875fff4dac57 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -878,12 +878,17 @@ mod tests { use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; use arrow::error::Result as ArrowResult; + use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; - use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray}; - use futures::TryStreamExt; + use arrow_array::{ + Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array, + }; + use arrow_schema::{DataType, Field, Schema}; + use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; use std::sync::Mutex; + use tempfile::tempfile; #[derive(Clone)] struct TestReader { @@ -1677,4 +1682,91 @@ mod tests { assert!(sbbf.check(&"Hello")); assert!(!sbbf.check(&"Hello_Not_Exists")); } + + #[tokio::test] + async fn test_nested_skip() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col_1", DataType::UInt64, false), + Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true), + ])); + + // Default writer properties + let props = WriterProperties::builder() + .set_data_page_row_count_limit(256) + .set_write_batch_size(256) + .set_max_row_group_size(1024); + + // Write data + let mut file = tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap(); + + let mut builder = ListBuilder::new(StringBuilder::new()); + for id in 0..1024 { + match id % 3 { + 0 => builder + .append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]), + 1 => builder.append_value([Some(format!("id_{id}"))]), + _ => builder.append_null(), + } + } + let refs = vec![ + Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef, + Arc::new(builder.finish()) as ArrayRef, + ]; + + let batch = RecordBatch::try_new(schema.clone(), refs).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let selections = [ + RowSelection::from(vec![ + RowSelector::skip(313), + RowSelector::select(1), + RowSelector::skip(709), + RowSelector::select(1), + ]), + RowSelection::from(vec![ + RowSelector::skip(255), + RowSelector::select(1), + RowSelector::skip(767), + RowSelector::select(1), + ]), + RowSelection::from(vec![ + RowSelector::select(255), + RowSelector::skip(1), + RowSelector::select(767), + RowSelector::skip(1), + ]), + RowSelection::from(vec![ + RowSelector::skip(254), + RowSelector::select(1), + RowSelector::select(1), + RowSelector::skip(767), + RowSelector::select(1), + ]), + ]; + + for selection in selections { + let expected = selection.row_count(); + // Read data + let mut reader = ParquetRecordBatchStreamBuilder::new_with_options( + tokio::fs::File::from_std(file.try_clone().unwrap()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + + reader = reader.with_row_selection(selection); + + let mut stream = reader.build().unwrap(); + + let mut total_rows = 0; + while let Some(rb) = stream.next().await { + let rb = rb.unwrap(); + total_rows += rb.num_rows(); + } + assert_eq!(total_rows, expected); + } + } } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index ec9af2aa271a..933e42386272 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -320,6 +320,20 @@ pub trait PageReader: Iterator> + Send { /// Skips reading the next page, returns an error if no /// column index information fn skip_next_page(&mut self) -> Result<()>; + + /// Returns `true` if the next page can be assumed to contain the start of a new record + /// + /// Prior to parquet V2 the specification was ambiguous as to whether a single record + /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do + /// this in certain situations. However, correctly interpreting the offset index relies on + /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`] + /// to signal this to the calling context + /// + /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327 + /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943 + fn at_record_boundary(&mut self) -> Result { + Ok(self.peek_next_page()?.is_none()) + } } /// API for writing pages in a column chunk. diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 3ce00622e953..52ad4d644c95 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -269,7 +269,7 @@ where // Reached end of page, which implies records_read < remaining_records // as otherwise would have stopped reading before reaching the end assert!(records_read < remaining_records); // Sanity check - records_read += 1; + records_read += reader.flush_partial() as usize; } (records_read, levels_read) } @@ -380,7 +380,7 @@ where // Reached end of page, which implies records_read < remaining_records // as otherwise would have stopped reading before reaching the end assert!(records_read < remaining_records); // Sanity check - records_read += 1; + records_read += decoder.flush_partial() as usize; } (records_read, levels_read) @@ -491,7 +491,7 @@ where offset += bytes_read; self.has_record_delimiter = - self.page_reader.peek_next_page()?.is_none(); + self.page_reader.at_record_boundary()?; self.rep_level_decoder .as_mut() @@ -548,7 +548,7 @@ where // across multiple pages, however, the parquet writer // used to do this so we preserve backwards compatibility self.has_record_delimiter = - self.page_reader.peek_next_page()?.is_none(); + self.page_reader.at_record_boundary()?; self.rep_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 369b335dc98f..27ffb7637e18 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -102,6 +102,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder { num_records: usize, num_levels: usize, ) -> Result<(usize, usize)>; + + /// Flush any partially read or skipped record + fn flush_partial(&mut self) -> bool; } pub trait DefinitionLevelDecoder: ColumnLevelDecoder { @@ -519,6 +522,10 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl { } Ok((total_records_read, total_levels_read)) } + + fn flush_partial(&mut self) -> bool { + std::mem::take(&mut self.has_partial) + } } #[cfg(test)] diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 4bc484144a81..b60d30ffea23 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -770,6 +770,15 @@ impl PageReader for SerializedPageReader { } } } + + fn at_record_boundary(&mut self) -> Result { + match &mut self.state { + SerializedPageReaderState::Values { .. } => { + Ok(self.peek_next_page()?.is_none()) + } + SerializedPageReaderState::Pages { .. } => Ok(true), + } + } } #[cfg(test)]