Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume Pages Delimit Records When Offset Index Loaded (#4921) #4943

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ where
Ok(records_read)
}

/// Uses `record_reader` to skip up to `batch_size` records from`pages`
/// Uses `record_reader` to skip up to `batch_size` records from `pages`
///
/// Returns the number of records skipped, which can be less than `batch_size` if
/// pages is exhausted
96 changes: 94 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -878,12 +878,17 @@ mod tests {
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray};
use futures::TryStreamExt;
use arrow_array::{
Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
use tempfile::tempfile;

#[derive(Clone)]
struct TestReader {
@@ -1677,4 +1682,91 @@ mod tests {
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this test fails as follows without the code changes in this PR

called `Result::unwrap()` on an `Err` value: ArrowError("Parquet argument error: Parquet error: StructArrayReader out of sync in read_records, expected 1 read, got 0")
thread 'arrow::async_reader::tests::test_nested_skip' panicked at parquet/src/arrow/async_reader/mod.rs:1760:29:
called `Result::unwrap()` on an `Err` value: ArrowError("Parquet argument error: Parquet error: StructArrayReader out of sync in read_records, expected 1 read, got 0")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5
   1: core::panicking::panic_fmt
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14
   2: core::result::unwrap_failed
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1652:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1077:23
   4: parquet::arrow::async_reader::tests::test_nested_skip::{{closure}}
             at ./src/arrow/async_reader/mod.rs:1760:26
   5: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/future/future.rs:125:9
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/future/future.rs:125:9
   7: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:665:57
   8: tokio::runtime::coop::with_budget
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5
   9: tokio::runtime::coop::budget
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5
  10: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:665:25
  11: tokio::runtime::scheduler::current_thread::Context::enter
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:410:19
  12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:664:36
  13: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:743:68
  14: tokio::runtime::context::scoped::Scoped<T>::set
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/scoped.rs:40:9
  15: tokio::runtime::context::set_scheduler::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context.rs:176:26
  16: std::thread::local::LocalKey<T>::try_with
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/thread/local.rs:270:16
  17: std::thread::local::LocalKey<T>::with
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/thread/local.rs:246:9
  18: tokio::runtime::context::set_scheduler
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context.rs:176:9
  19: tokio::runtime::scheduler::current_thread::CoreGuard::enter
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:743:27
  20: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:652:19
  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:175:28
  22: tokio::runtime::context::runtime::enter_runtime
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/runtime.rs:65:16
  23: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/current_thread/mod.rs:167:9
  24: tokio::runtime::runtime::Runtime::block_on
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/runtime.rs:347:47
  25: parquet::arrow::async_reader::tests::test_nested_skip
             at ./src/arrow/async_reader/mod.rs:1744:9
  26: parquet::arrow::async_reader::tests::test_nested_skip::{{closure}}
             at ./src/arrow/async_reader/mod.rs:1687:33
  27: core::ops::function::FnOnce::call_once
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/ops/function.rs:250:5
  28: core::ops::function::FnOnce::call_once
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.


async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
]));

// Default writer properties
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_size(1024);

// Write data
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();

let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder
.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];

let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::select(255),
RowSelector::skip(1),
RowSelector::select(767),
RowSelector::skip(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];

for selection in selections {
let expected = selection.row_count();
// Read data
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();

reader = reader.with_row_selection(selection);

let mut stream = reader.build().unwrap();

let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
}
14 changes: 14 additions & 0 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
@@ -320,6 +320,20 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
/// Skips reading the next page, returns an error if no
/// column index information
fn skip_next_page(&mut self) -> Result<()>;

/// Returns `true` if the next page can be assumed to contain the start of a new record
///
/// Prior to parquet V2 the specification was ambiguous as to whether a single record
/// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
/// this in certain situations. However, correctly interpreting the offset index relies on
/// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
/// to signal this to the calling context
///
/// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
/// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
fn at_record_boundary(&mut self) -> Result<bool> {
Ok(self.peek_next_page()?.is_none())
}
}

/// API for writing pages in a column chunk.
8 changes: 4 additions & 4 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
@@ -269,7 +269,7 @@ where
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
records_read += 1;
records_read += reader.flush_partial() as usize;
}
(records_read, levels_read)
}
@@ -380,7 +380,7 @@ where
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
records_read += 1;
records_read += decoder.flush_partial() as usize;
}

(records_read, levels_read)
@@ -491,7 +491,7 @@ where
offset += bytes_read;

self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();
self.page_reader.at_record_boundary()?;

self.rep_level_decoder
.as_mut()
@@ -548,7 +548,7 @@ where
// across multiple pages, however, the parquet writer
// used to do this so we preserve backwards compatibility
self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();
self.page_reader.at_record_boundary()?;

self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
7 changes: 7 additions & 0 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
@@ -102,6 +102,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
num_records: usize,
num_levels: usize,
) -> Result<(usize, usize)>;

/// Flush any partially read or skipped record
fn flush_partial(&mut self) -> bool;
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
@@ -519,6 +522,10 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
}
Ok((total_records_read, total_levels_read))
}

fn flush_partial(&mut self) -> bool {
std::mem::take(&mut self.has_partial)
}
}

#[cfg(test)]
9 changes: 9 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
@@ -770,6 +770,15 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}
}
}

fn at_record_boundary(&mut self) -> Result<bool> {
match &mut self.state {
SerializedPageReaderState::Values { .. } => {
Ok(self.peek_next_page()?.is_none())
}
SerializedPageReaderState::Pages { .. } => Ok(true),
}
}
}

#[cfg(test)]