Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume Pages Delimit Records When Offset Index Loaded (#4921) #4943

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
Ok(records_read)
}

/// Uses `record_reader` to skip up to `batch_size` records from`pages`
/// Uses `record_reader` to skip up to `batch_size` records from `pages`
///
/// Returns the number of records skipped, which can be less than `batch_size` if
/// pages is exhausted
Expand Down
90 changes: 88 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,12 +878,17 @@ mod tests {
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray};
use futures::TryStreamExt;
use arrow_array::{
Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
use tempfile::tempfile;

#[derive(Clone)]
struct TestReader {
Expand Down Expand Up @@ -1677,4 +1682,85 @@ mod tests {
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this test fails as follows without the code changes in this PR

called `Result::unwrap()` on an `Err` value: ArrowError("Parquet argument error: Parquet error: StructArrayReader out of sync in read_records, expected 1 read, got 0")
thread 'arrow::async_reader::tests::test_nested_skip' panicked at parquet/src/arrow/async_reader/mod.rs:1760:29:
called `Result::unwrap()` on an `Err` value: ArrowError("Parquet argument error: Parquet error: StructArrayReader out of sync in read_records, expected 1 read, got 0")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5
   1: core::panicking::panic_fmt
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14
   2: core::result::unwrap_failed
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1652:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1077:23
   4: parquet::arrow::async_reader::tests::test_nested_skip::{{closure}}
             at ./src/arrow/async_reader/mod.rs:1760:26
   5: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/future/future.rs:125:9
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/future/future.rs:125:9
   7: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:665:57
   8: tokio::runtime::coop::with_budget
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5
   9: tokio::runtime::coop::budget
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5
  10: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:665:25
  11: tokio::runtime::scheduler::current_thread::Context::enter
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:410:19
  12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:664:36
  13: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:743:68
  14: tokio::runtime::context::scoped::Scoped<T>::set
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/scoped.rs:40:9
  15: tokio::runtime::context::set_scheduler::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context.rs:176:26
  16: std::thread::local::LocalKey<T>::try_with
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/thread/local.rs:270:16
  17: std::thread::local::LocalKey<T>::with
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/thread/local.rs:246:9
  18: tokio::runtime::context::set_scheduler
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context.rs:176:9
  19: tokio::runtime::scheduler::current_thread::CoreGuard::enter
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:743:27
  20: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:652:19
  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:175:28
  22: tokio::runtime::context::runtime::enter_runtime
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/runtime.rs:65:16
  23: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:167:9
  24: tokio::runtime::runtime::Runtime::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/runtime.rs:347:47
  25: parquet::arrow::async_reader::tests::test_nested_skip
             at ./src/arrow/async_reader/mod.rs:1744:9
  26: parquet::arrow::async_reader::tests::test_nested_skip::{{closure}}
             at ./src/arrow/async_reader/mod.rs:1687:33
  27: core::ops::function::FnOnce::call_once
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/ops/function.rs:250:5
  28: core::ops::function::FnOnce::call_once
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.


async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
]));

// Default writer properties
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_size(1024);

// Write data
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();

let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder
.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];

let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];

for selection in selections {
let expected = selection.row_count();
// Read data
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();

reader = reader.with_row_selection(selection);

let mut stream = reader.build().unwrap();

let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
}
17 changes: 2 additions & 15 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ pub struct GenericColumnReader<R, D, V> {
/// so far.
num_decoded_values: usize,

/// True if the end of the current data page denotes the end of a record
has_record_delimiter: bool,

/// The decoder for the definition levels if any
def_level_decoder: Option<D>,

Expand Down Expand Up @@ -182,7 +179,6 @@ where
num_buffered_values: 0,
num_decoded_values: 0,
values_decoder,
has_record_delimiter: false,
}
}

Expand Down Expand Up @@ -265,7 +261,7 @@ where
remaining_records,
)?;

if levels_read == remaining_levels && self.has_record_delimiter {
if levels_read == remaining_levels {
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
Expand Down Expand Up @@ -376,7 +372,7 @@ where
let (mut records_read, levels_read) =
decoder.skip_rep_levels(remaining_records, remaining_levels)?;

if levels_read == remaining_levels && self.has_record_delimiter {
if levels_read == remaining_levels {
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
Expand Down Expand Up @@ -490,9 +486,6 @@ where
)?;
offset += bytes_read;

self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();

self.rep_level_decoder
.as_mut()
.unwrap()
Expand Down Expand Up @@ -544,12 +537,6 @@ where
// DataPage v2 only supports RLE encoding for repetition
// levels
if self.descr.max_rep_level() > 0 {
// Technically a DataPage v2 should not write a record
// across multiple pages, however, the parquet writer
// used to do this so we preserve backwards compatibility
self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();

self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.range(0, rep_levels_byte_len as usize),
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
self.buffer_len = 0;
self.buffer_offset = 0;
self.has_partial = false;
}
}

Expand Down
Loading