Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract parquet statistics for StructArray #11289

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 173 additions & 40 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, StructArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
Expand Down Expand Up @@ -989,6 +989,11 @@ macro_rules! get_data_page_statistics {
}
}

fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option<usize> {
(0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)
}

/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
Expand All @@ -998,20 +1003,31 @@ pub(crate) fn parquet_column<'a>(
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
if !field.data_type().is_nested() {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
return Some((parquet_idx, field));
}
// Nested field
match field.data_type() {
DataType::Struct(_) => {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
_ => {
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
None
} else {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
}
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
Expand Down Expand Up @@ -1234,7 +1250,87 @@ impl<'a> StatisticsConverter<'a> {
arrow_field,
})
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_statistics_min_max_recursive(
metadata: &[&RowGroupMetaData],
index: &mut usize,
is_min: bool,
data_type: &DataType,
) -> Result<ArrayRef> {
match data_type.is_nested() {
false => {
let iterator = metadata.iter().map(|meta| {
let stat = meta.column(*index).statistics();
stat
});
let stat = if is_min {
min_statistics(data_type, iterator)
} else {
max_statistics(data_type, iterator)
};
*index += 1;
stat
}
true => {
if let DataType::Struct(fields) = data_type {
let field_arrays: Vec<_> = fields
.iter()
.map(|field| {
let array = Self::get_statistics_min_max_recursive(
metadata,
index,
is_min,
field.data_type(),
)?;
Ok((field.clone(), array))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef)
} else {
plan_err!("unsupported nested data type for extracting statistics")
}
}
}
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_null_counts_recursive(
metadata: &[&RowGroupMetaData],
index: usize,
data_type: &DataType,
) -> Vec<u64> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you preferred, this could also be expressed as a fold:

            let num_row_groups = metadata.len();
            fields.iter().fold(vec![0; num_row_groups], |mut acc, field| {
                let field_null_counts = Self::get_null_counts_recursive(
                    metadata,
                    index + 1,
                    field.data_type(),
                );
                acc.iter_mut().zip(field_null_counts.iter()).for_each(|(a, b)| *a += b);
                acc
            })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sorry for the late response

if let DataType::Struct(fields) = data_type {
let num_row_groups = metadata.len();
let mut null_counts = vec![0; num_row_groups];

fields.iter().for_each(|field| {
let field_null_counts = Self::get_null_counts_recursive(
metadata,
index + 1,
field.data_type(),
);
null_counts
.iter_mut()
.zip(field_null_counts)
.for_each(|(acc, count)| {
*acc += count;
});
});

null_counts
} else {
metadata
.iter()
.map(|meta| {
meta.column(index)
.statistics()
.map(|s| s.null_count())
.unwrap_or(0)
})
.collect()
}
}
/// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
///
/// # Return Value
Expand Down Expand Up @@ -1284,11 +1380,22 @@ impl<'a> StatisticsConverter<'a> {
let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(
&group_vec, &mut 0, true, data_type,
)
}
_ => min_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1304,10 +1411,22 @@ impl<'a> StatisticsConverter<'a> {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(
&group_vec, &mut 0, false, data_type,
)
}
_ => max_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1317,18 +1436,33 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
match data_type {
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
let null_counts =
Self::get_null_counts_recursive(&group_vec, 0, data_type);
Ok(UInt64Array::from_iter(null_counts))
}
_ => {
let null_counts = create_iterator(metadatas, parquet_index)
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
}
}
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1541,10 +1675,10 @@ mod test {
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow_array::{
new_empty_array, new_null_array, Array, BinaryArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch,
StringArray, StructArray, TimestampNanosecondArray,
new_empty_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray,
StructArray, TimestampNanosecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -1988,7 +2122,7 @@ mod test {

#[test]
fn roundtrip_struct() {
let mut test = Test {
let test = Test {
input: struct_array(vec![
// row group 1
(Some(true), Some(1)),
Expand All @@ -2005,22 +2139,18 @@ mod test {
]),
expected_min: struct_array(vec![
(Some(true), Some(1)),
(Some(true), Some(0)),
(Some(false), Some(0)),
(None, None),
]),

expected_max: struct_array(vec![
(Some(true), Some(3)),
(Some(true), Some(0)),
(Some(true), Some(5)),
(None, None),
]),
};
// Due to https://github.com/apache/datafusion/issues/8334,
// statistics for struct arrays are not supported
test.expected_min =
new_null_array(test.input.data_type(), test.expected_min.len());
test.expected_max =
new_null_array(test.input.data_type(), test.expected_min.len());
test.run()
}

Expand Down Expand Up @@ -2376,7 +2506,10 @@ mod test {
let row_groups = metadata.row_groups();

for field in schema.fields() {
if field.data_type().is_nested() {
let data_type = field.data_type();
if field.data_type().is_nested()
&& !matches!(data_type, &DataType::Struct(_))
{
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
continue;
Expand Down
Loading