Skip to content

Commit

Permalink
Parquet: derive boundary order when writing (#5110)
Browse files Browse the repository at this point in the history
* Parquet: derive boundary order when writing

* Fix

* Refactor boundary check location

* Fix

* Refactor according to review
  • Loading branch information
Jefffrey authored Nov 28, 2023
1 parent 8a0b5cb commit 34a816d
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 26 deletions.
239 changes: 216 additions & 23 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;
use half::f16;

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
use std::str;

Expand Down Expand Up @@ -228,6 +228,13 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,

// Below fields used to incrementally check boundary order across data pages.
// We assume they are ascending/descending until proven wrong.
data_page_boundary_ascending: bool,
data_page_boundary_descending: bool,
/// (min, max)
last_non_null_data_page_min_max: Option<(E::T, E::T)>,
}

impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Expand Down Expand Up @@ -279,6 +286,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
last_non_null_data_page_min_max: None,
}
}

Expand Down Expand Up @@ -467,6 +477,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let metadata = self.write_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
self.data_page_boundary_ascending,
self.data_page_boundary_descending,
) {
// If the lists are composed of equal elements then will be marked as ascending
// (Also the case if all pages are null pages)
(true, _) => BoundaryOrder::ASCENDING,
(false, true) => BoundaryOrder::DESCENDING,
(false, false) => BoundaryOrder::UNORDERED,
};
self.column_index_builder.set_boundary_order(boundary_order);

let column_index = self
.column_index_builder
.valid()
Expand Down Expand Up @@ -610,7 +632,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
Expand All @@ -631,6 +653,30 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_index_builder.to_invalid();
}
Some(stat) => {
// Check if min/max are still ascending/descending across pages
let new_min = stat.min();
let new_max = stat.max();
if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
if self.data_page_boundary_ascending {
// If last min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, last_min, new_min)
|| compare_greater(&self.descr, last_max, new_max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than last min/max then not descending anymore
let not_descending = compare_greater(&self.descr, new_min, last_min)
|| compare_greater(&self.descr, new_max, last_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));

// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -703,7 +749,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
(Some(min), Some(max)) => {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
Some(Statistics::new(
Some(ValueStatistics::new(
Some(min),
Some(max),
None,
Expand All @@ -716,6 +762,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
let page_statistics = page_statistics.map(Statistics::from);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -2569,7 +2616,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2636,7 +2683,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2891,6 +2938,158 @@ mod tests {
assert!(incremented.is_none())
}

#[test]
fn test_boundary_order() -> Result<()> {
let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
// min max both ascending
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(-10), Some(10)],
&[Some(-5), Some(11)],
&[None],
&[Some(-5), Some(11)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// min max both descending
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(10), Some(11)],
&[Some(5), Some(11)],
&[None],
&[Some(-5), Some(0)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::DESCENDING);

// min max both equal
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// only nulls
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// one page
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// one non-null page
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// min max both unordered
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(10), Some(11)],
&[Some(11), Some(16)],
&[None],
&[Some(-5), Some(0)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

// min max both ordered in different orders
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(1), Some(9)],
&[Some(2), Some(8)],
&[None],
&[Some(3), Some(7)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

Ok(())
}

#[test]
fn test_boundary_order_logical_type() -> Result<()> {
// ensure that logical types account for different sort order than underlying
// physical type representation
let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
let fba_descr = {
let tpe = SchemaType::primitive_type_builder(
"col",
FixedLenByteArrayType::get_physical_type(),
)
.with_length(2)
.build()?;
Arc::new(ColumnDescriptor::new(
Arc::new(tpe),
1,
0,
ColumnPath::from("col"),
))
};

let values: &[&[Option<FixedLenByteArray>]] = &[
&[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
&[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
&[Some(FixedLenByteArray::from(ByteArray::from(
f16::NEG_ZERO,
)))],
&[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
];

// f16 descending
let column_close_result =
write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::DESCENDING);

// same bytes, but fba unordered
let column_close_result =
write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

Ok(())
}

fn write_multiple_pages<T: DataType>(
column_descr: &Arc<ColumnDescriptor>,
pages: &[&[Option<T::T>]],
) -> Result<ColumnCloseResult> {
let column_writer = get_column_writer(
column_descr.clone(),
Default::default(),
get_test_page_writer(),
);
let mut writer = get_typed_column_writer::<T>(column_writer);

for &page in pages {
let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
let def_levels = page
.iter()
.map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
.collect::<Vec<_>>();
writer.write_batch(&values, Some(&def_levels), None)?;
writer.flush_data_pages()?;
}

writer.close()
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -3197,8 +3396,7 @@ mod tests {
) -> ValueStatistics<FixedLenByteArray> {
let page_writer = get_test_page_writer();
let props = Default::default();
let mut writer =
get_test_float16_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
Expand All @@ -3209,30 +3407,25 @@ mod tests {
}
}

fn get_test_float16_column_writer<T: DataType>(
fn get_test_float16_column_writer(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<'static, T> {
let descr = Arc::new(get_test_float16_column_descr::<T>(
max_def_level,
max_rep_level,
));
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

fn get_test_float16_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_length(2)
.with_logical_type(Some(LogicalType::Float16))
.build()
.unwrap();
let tpe =
SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
.with_length(2)
.with_logical_type(Some(LogicalType::Float16))
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

Expand Down
9 changes: 6 additions & 3 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,8 @@ pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
// TODO: calc the order for all pages in this column
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
// If one page can't get build index, need to ignore all index in this column
valid: bool,
}
Expand All @@ -904,8 +903,8 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
valid: true,
}
}
Expand All @@ -923,6 +922,10 @@ impl ColumnIndexBuilder {
self.null_counts.push(null_count);
}

pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}

pub fn to_invalid(&mut self) {
self.valid = false;
}
Expand Down

0 comments on commit 34a816d

Please sign in to comment.