Skip to content

Commit

Permalink
Refactor boundary check location
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Nov 26, 2023
1 parent 79fa0b9 commit f635e0a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 253 deletions.
118 changes: 81 additions & 37 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;
use half::f16;

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
use std::str;

Expand Down Expand Up @@ -228,6 +228,12 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
// Below fields used to incrementally check boundary order across data pages
// We assume they are ascending/descending until proven wrong
data_page_boundary_ascending: bool,
data_page_boundary_descending: bool,
/// (min, max)
latest_non_null_data_page_min_max: Option<(E::T, E::T)>,
}

impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Expand Down Expand Up @@ -279,6 +285,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
latest_non_null_data_page_min_max: None,
}
}

Expand Down Expand Up @@ -467,6 +476,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let metadata = self.write_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
self.data_page_boundary_ascending,
self.data_page_boundary_descending,
) {
// If the lists are composed of equal elements then will be marked as ascending
// (Also the case if all pages are null pages)
(true, true) => BoundaryOrder::ASCENDING,
(true, false) => BoundaryOrder::ASCENDING,
(false, true) => BoundaryOrder::DESCENDING,
(false, false) => BoundaryOrder::UNORDERED,
};
self.column_index_builder.set_boundary_order(boundary_order);

let column_index = self
.column_index_builder
.valid()
Expand Down Expand Up @@ -610,21 +632,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) -> Result<()> {
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
)?;
self.column_index_builder.append(
null_page,
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
Expand All @@ -636,32 +656,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
)?;
self.column_index_builder.append(
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
}
_ => {
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
)?;
self.column_index_builder.append(
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
);
}
}
}
Expand All @@ -670,7 +686,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);
Ok(())
}

fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
Expand Down Expand Up @@ -710,6 +725,35 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
(Some(min), Some(max)) => {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

// Check if min/max are still ascending/descending across pages
// Null pages aren't considered in this sort order
let null_page = (self.page_metrics.num_buffered_rows as u64)
== self.page_metrics.num_page_nulls;
if !null_page {
if let Some((latest_min, latest_max)) = &self.latest_non_null_data_page_min_max
{
if self.data_page_boundary_ascending {
// If latest min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, latest_min, &min)
|| compare_greater(&self.descr, latest_max, &max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than latest min/max then not descending anymore
let not_descending = compare_greater(&self.descr, &min, latest_min)
|| compare_greater(&self.descr, &max, latest_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.latest_non_null_data_page_min_max = Some((min.clone(), max.clone()));
}

Some(Statistics::new(
Some(min),
Some(max),
Expand All @@ -722,7 +766,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref())?;
self.update_column_offset_index(page_statistics.as_ref());

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -1072,7 +1116,7 @@ fn update_stat<T: ParquetValueType, F>(
}

/// Evaluate `a > b` according to underlying logical type.
pub(crate) fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
pub fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
if !is_signed {
// need to compare unsigned
Expand Down
Loading

0 comments on commit f635e0a

Please sign in to comment.