Skip to content

Commit

Permalink
Assume records not split across pages (#4921)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 16, 2023
1 parent 90bc5ec commit 8bada26
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 18 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
Ok(records_read)
}

/// Uses `record_reader` to skip up to `batch_size` records from`pages`
/// Uses `record_reader` to skip up to `batch_size` records from `pages`
///
/// Returns the number of records skipped, which can be less than `batch_size` if
/// pages is exhausted
Expand Down
90 changes: 88 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,12 +878,17 @@ mod tests {
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray};
use futures::TryStreamExt;
use arrow_array::{
Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
use tempfile::tempfile;

#[derive(Clone)]
struct TestReader {
Expand Down Expand Up @@ -1677,4 +1682,85 @@ mod tests {
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}

#[tokio::test]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
]));

// Default writer properties
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_size(1024);

// Write data
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();

let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder
.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];

let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];

for selection in selections {
let expected = selection.row_count();
// Read data
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();

reader = reader.with_row_selection(selection);

let mut stream = reader.build().unwrap();

let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
}
17 changes: 2 additions & 15 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ pub struct GenericColumnReader<R, D, V> {
/// so far.
num_decoded_values: usize,

/// True if the end of the current data page denotes the end of a record
has_record_delimiter: bool,

/// The decoder for the definition levels if any
def_level_decoder: Option<D>,

Expand Down Expand Up @@ -182,7 +179,6 @@ where
num_buffered_values: 0,
num_decoded_values: 0,
values_decoder,
has_record_delimiter: false,
}
}

Expand Down Expand Up @@ -265,7 +261,7 @@ where
remaining_records,
)?;

if levels_read == remaining_levels && self.has_record_delimiter {
if levels_read == remaining_levels {
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
Expand Down Expand Up @@ -376,7 +372,7 @@ where
let (mut records_read, levels_read) =
decoder.skip_rep_levels(remaining_records, remaining_levels)?;

if levels_read == remaining_levels && self.has_record_delimiter {
if levels_read == remaining_levels {
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
Expand Down Expand Up @@ -490,9 +486,6 @@ where
)?;
offset += bytes_read;

self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();

self.rep_level_decoder
.as_mut()
.unwrap()
Expand Down Expand Up @@ -544,12 +537,6 @@ where
// DataPage v2 only supports RLE encoding for repetition
// levels
if self.descr.max_rep_level() > 0 {
// Technically a DataPage v2 should not write a record
// across multiple pages, however, the parquet writer
// used to do this so we preserve backwards compatibility
self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();

self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.range(0, rep_levels_byte_len as usize),
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
self.buffer_len = 0;
self.buffer_offset = 0;
self.has_partial = false;
}
}

Expand Down

0 comments on commit 8bada26

Please sign in to comment.