Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract parquet statistics for StructArray #6090

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 132 additions & 27 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].

use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
use crate::data_type::{ByteArray, FixedLenByteArray};
Expand All @@ -29,6 +28,7 @@ use arrow_array::builder::{
BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
StringViewBuilder,
};
use arrow_array::StructArray;
use arrow_array::{
new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array,
Expand Down Expand Up @@ -1216,7 +1216,81 @@ impl<'a> StatisticsConverter<'a> {
}
Ok(Some(builder.finish()))
}

pub(crate) fn get_statistics_min_max_recursive(
metadata: &[&RowGroupMetaData],
index: &mut usize,
is_min: bool,
data_type: &DataType,
) -> Result<ArrayRef> {
match data_type.is_nested() {
false => {
let iterator = metadata.iter().map(|meta| {
let stat = meta.column(*index).statistics();
stat
});
let stat = if is_min {
min_statistics(data_type, iterator)
} else {
max_statistics(data_type, iterator)
};
*index += 1;
stat
}
true => {
if let DataType::Struct(fields) = data_type {
let field_arrays: Vec<_> = fields
.iter()
.map(|field| {
let array = Self::get_statistics_min_max_recursive(
metadata,
index,
is_min,
field.data_type(),
)?;
Ok((field.clone(), array))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef)
} else {
Err(arrow_err!(
"unsupported nested data type for extracting statistics".to_string()
))
}
}
}
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_null_counts_recursive(
metadata: &[&RowGroupMetaData],
index: usize,
data_type: &DataType,
) -> Vec<u64> {
if let DataType::Struct(fields) = data_type {
let num_row_groups = metadata.len();
fields
.iter()
.fold(vec![0; num_row_groups], |mut acc, field| {
let field_null =
Self::get_null_counts_recursive(metadata, index + 1, field.data_type());

acc.iter_mut()
.zip(field_null.iter())
.for_each(|(a, b)| *a += b);
acc
})
} else {
metadata
.iter()
.map(|meta| {
meta.column(index)
.statistics()
.map(|s| s.null_count())
.unwrap_or(0)
})
.collect()
}
}
/// Create a new `StatisticsConverter` to extract statistics for a column
///
/// Note if there is no corresponding column in the parquet file, the returned
Expand Down Expand Up @@ -1314,13 +1388,21 @@ impl<'a> StatisticsConverter<'a> {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(&group_vec, &mut 0, true, data_type)
}
_ => min_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
///
/// See docs on [`Self::row_group_mins`] for details
Expand All @@ -1334,10 +1416,20 @@ impl<'a> StatisticsConverter<'a> {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(&group_vec, &mut 0, false, data_type)
}
_ => max_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1347,18 +1439,32 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
match data_type {
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
let null_counts = Self::get_null_counts_recursive(&group_vec, 0, data_type);
Ok(UInt64Array::from_iter(null_counts))
}
_ => {
let null_counts =
create_iterator(metadatas, parquet_index).map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
}
}
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1580,10 +1686,10 @@ mod test {
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow::util::test_util::parquet_test_data;
use arrow_array::{
new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray,
BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch,
StringArray, StringViewArray, StructArray, TimestampNanosecondArray,
new_empty_array, Array, ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray,
StringViewArray, StructArray, TimestampNanosecondArray,
};
use arrow_schema::{DataType, Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -2058,7 +2164,7 @@ mod test {

#[test]
fn roundtrip_struct() {
let mut test = Test {
let test = Test {
input: struct_array(vec![
// row group 1
(Some(true), Some(1)),
Expand All @@ -2075,20 +2181,18 @@ mod test {
]),
expected_min: struct_array(vec![
(Some(true), Some(1)),
(Some(true), Some(0)),
(Some(false), Some(0)),
(None, None),
]),

expected_max: struct_array(vec![
(Some(true), Some(3)),
(Some(true), Some(0)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original values was

                (Some(true), Some(0)),
                (Some(false), Some(5)),
                (None, None),

Then isn't min should be Some(false), Some(5) and max should be Some(true),Some(0)

(Some(true), Some(5)),
(None, None),
]),
};
// Due to https://github.com/apache/datafusion/issues/8334,
// statistics for struct arrays are not supported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we remove this comment after this was resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

test.expected_min = new_null_array(test.input.data_type(), test.expected_min.len());
test.expected_max = new_null_array(test.input.data_type(), test.expected_min.len());
test.run()
}

Expand Down Expand Up @@ -2424,7 +2528,8 @@ mod test {
let row_groups = metadata.row_groups();

for field in schema.fields() {
if field.data_type().is_nested() {
let data_type = field.data_type();
if field.data_type().is_nested() && !matches!(data_type, &DataType::Struct(_)) {
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
continue;
Expand Down
45 changes: 30 additions & 15 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ pub use self::arrow_writer::ArrowWriter;
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
#[cfg(feature = "async")]
pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
};
use crate::schema::types::SchemaDescriptor;
use arrow_schema::DataType;
use arrow_schema::{FieldRef, Schema};

/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
Expand Down Expand Up @@ -221,18 +221,33 @@ pub fn parquet_column<'a>(
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
if !field.data_type().is_nested() {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
return Some((parquet_idx, field));
}
// Nested field
match field.data_type() {
DataType::Struct(_) => {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
_ => {
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
None
} else {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
}
}
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option<usize> {
(0..parquet_schema.columns().len()).find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)
}
52 changes: 52 additions & 0 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_array::{
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_buffer::i256;
use arrow_schema::Fields;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::Datelike;
use chrono::{Duration, TimeDelta};
Expand Down Expand Up @@ -87,6 +88,7 @@ enum Scenario {
Dictionary,
PeriodsInColumnNames,
StructArray,
StructArrayNested,
UTF8,
UTF8View,
BinaryView,
Expand Down Expand Up @@ -890,6 +892,56 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
)]));
vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()]
}
Scenario::StructArrayNested => {
let inner_boolean = Arc::new(BooleanArray::from(vec![false, true, false]));
let inner_int = Arc::new(Int32Array::from(vec![42, 43, 44]));

let inner_array = StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Boolean, false)),
inner_boolean as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
inner_int as ArrayRef,
),
]);

let inner_fields = Fields::from(vec![
Field::new("b", DataType::Boolean, false),
Field::new("c", DataType::Int32, false),
]);

let outer_float = Arc::new(Float64Array::from(vec![5.0, 6.0, 7.0]));
let outer_boolean = Arc::new(BooleanArray::from(vec![true, false, true]));

let outer_struct_array = StructArray::from(vec![
(
Arc::new(Field::new(
"inner_struct",
DataType::Struct(inner_fields),
false,
)),
Arc::new(inner_array) as ArrayRef,
),
(
Arc::new(Field::new("outer_float", DataType::Float64, false)),
outer_float as ArrayRef,
),
(
Arc::new(Field::new("outer_boolean", DataType::Boolean, false)),
outer_boolean as ArrayRef,
),
]);

let schema = Arc::new(Schema::new(vec![Field::new(
"nested_struct",
outer_struct_array.data_type().clone(),
true,
)]));

vec![RecordBatch::try_new(schema, vec![Arc::new(outer_struct_array)]).unwrap()]
}
Scenario::Time32Second => {
vec![
make_time32_batches(Scenario::Time32Second, vec![18506, 18507, 18508, 18509]),
Expand Down
Loading
Loading