Skip to content

Commit

Permalink
Extract parquet statistics for StructArray
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Jul 19, 2024
1 parent 9be0eb5 commit cbd6eac
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 45 deletions.
157 changes: 131 additions & 26 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
use crate::data_type::{ByteArray, FixedLenByteArray};
Expand All @@ -28,6 +27,7 @@ use crate::schema::types::SchemaDescriptor;
use arrow_array::builder::{
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
};
use arrow_array::StructArray;
use arrow_array::{
new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array,
Expand Down Expand Up @@ -1135,7 +1135,81 @@ impl<'a> StatisticsConverter<'a> {
}
Ok(Some(builder.finish()))
}

pub(crate) fn get_statistics_min_max_recursive(
metadata: &[&RowGroupMetaData],
index: &mut usize,
is_min: bool,
data_type: &DataType,
) -> Result<ArrayRef> {
match data_type.is_nested() {
false => {
let iterator = metadata.iter().map(|meta| {
let stat = meta.column(*index).statistics();
stat
});
let stat = if is_min {
min_statistics(data_type, iterator)
} else {
max_statistics(data_type, iterator)
};
*index += 1;
stat
}
true => {
if let DataType::Struct(fields) = data_type {
let field_arrays: Vec<_> = fields
.iter()
.map(|field| {
let array = Self::get_statistics_min_max_recursive(
metadata,
index,
is_min,
field.data_type(),
)?;
Ok((field.clone(), array))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef)
} else {
Err(arrow_err!(
"unsupported nested data type for extracting statistics".to_string()
))
}
}
}
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_null_counts_recursive(
metadata: &[&RowGroupMetaData],
index: usize,
data_type: &DataType,
) -> Vec<u64> {
if let DataType::Struct(fields) = data_type {
let num_row_groups = metadata.len();
fields
.iter()
.fold(vec![0; num_row_groups], |mut acc, field| {
let field_null =
Self::get_null_counts_recursive(metadata, index + 1, field.data_type());

acc.iter_mut()
.zip(field_null.iter())
.for_each(|(a, b)| *a += b);
acc
})
} else {
metadata
.iter()
.map(|meta| {
meta.column(index)
.statistics()
.map(|s| s.null_count())
.unwrap_or(0)
})
.collect()
}
}
/// Create a new `StatisticsConverter` to extract statistics for a column
///
/// Note if there is no corresponding column in the parquet file, the returned
Expand Down Expand Up @@ -1233,13 +1307,21 @@ impl<'a> StatisticsConverter<'a> {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(&group_vec, &mut 0, true, data_type)
}
_ => min_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
///
/// See docs on [`Self::row_group_mins`] for details
Expand All @@ -1253,10 +1335,20 @@ impl<'a> StatisticsConverter<'a> {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(&group_vec, &mut 0, false, data_type)
}
_ => max_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1266,18 +1358,32 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
match data_type {
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
let null_counts = Self::get_null_counts_recursive(&group_vec, 0, data_type);
Ok(UInt64Array::from_iter(null_counts))
}
_ => {
let null_counts =
create_iterator(metadatas, parquet_index).map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
}
}
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1493,9 +1599,9 @@ mod test {
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow::util::test_util::parquet_test_data;
use arrow_array::{
new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
new_empty_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
};
use arrow_schema::{DataType, Field, SchemaRef};
Expand Down Expand Up @@ -1912,7 +2018,7 @@ mod test {

#[test]
fn roundtrip_struct() {
let mut test = Test {
let test = Test {
input: struct_array(vec![
// row group 1
(Some(true), Some(1)),
Expand All @@ -1929,20 +2035,18 @@ mod test {
]),
expected_min: struct_array(vec![
(Some(true), Some(1)),
(Some(true), Some(0)),
(Some(false), Some(0)),
(None, None),
]),

expected_max: struct_array(vec![
(Some(true), Some(3)),
(Some(true), Some(0)),
(Some(true), Some(5)),
(None, None),
]),
};
// Due to https://github.com/apache/datafusion/issues/8334,
// statistics for struct arrays are not supported
test.expected_min = new_null_array(test.input.data_type(), test.expected_min.len());
test.expected_max = new_null_array(test.input.data_type(), test.expected_min.len());
test.run()
}

Expand Down Expand Up @@ -2278,7 +2382,8 @@ mod test {
let row_groups = metadata.row_groups();

for field in schema.fields() {
if field.data_type().is_nested() {
let data_type = field.data_type();
if field.data_type().is_nested() && !matches!(data_type, &DataType::Struct(_)) {
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
continue;
Expand Down
47 changes: 31 additions & 16 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ pub use self::arrow_writer::ArrowWriter;
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
#[cfg(feature = "async")]
pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
};
use crate::schema::types::SchemaDescriptor;
use arrow_schema::DataType;
use arrow_schema::{FieldRef, Schema};

/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
Expand Down Expand Up @@ -215,24 +215,39 @@ impl ProjectionMask {
/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
pub fn parquet_column<'a>(
pub(crate) fn parquet_column<'a>(
parquet_schema: &SchemaDescriptor,
arrow_schema: &'a Schema,
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
if !field.data_type().is_nested() {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
return Some((parquet_idx, field));
}
// Nested field
match field.data_type() {
DataType::Struct(_) => {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
_ => {
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
None
} else {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
}
}
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option<usize> {
(0..parquet_schema.columns().len()).find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)
}
52 changes: 52 additions & 0 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_array::{
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_buffer::i256;
use arrow_schema::Fields;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::Datelike;
use chrono::{Duration, TimeDelta};
Expand Down Expand Up @@ -87,6 +88,7 @@ enum Scenario {
Dictionary,
PeriodsInColumnNames,
StructArray,
StructArrayNested,
UTF8,
}

Expand Down Expand Up @@ -878,6 +880,56 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
)]));
vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()]
}
Scenario::StructArrayNested => {
let inner_boolean = Arc::new(BooleanArray::from(vec![false, true, false]));
let inner_int = Arc::new(Int32Array::from(vec![42, 43, 44]));

let inner_array = StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Boolean, false)),
inner_boolean as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
inner_int as ArrayRef,
),
]);

let inner_fields = Fields::from(vec![
Field::new("b", DataType::Boolean, false),
Field::new("c", DataType::Int32, false),
]);

let outer_float = Arc::new(Float64Array::from(vec![5.0, 6.0, 7.0]));
let outer_boolean = Arc::new(BooleanArray::from(vec![true, false, true]));

let outer_struct_array = StructArray::from(vec![
(
Arc::new(Field::new(
"inner_struct",
DataType::Struct(inner_fields),
false,
)),
Arc::new(inner_array) as ArrayRef,
),
(
Arc::new(Field::new("outer_float", DataType::Float64, false)),
outer_float as ArrayRef,
),
(
Arc::new(Field::new("outer_boolean", DataType::Boolean, false)),
outer_boolean as ArrayRef,
),
]);

let schema = Arc::new(Schema::new(vec![Field::new(
"nested_struct",
outer_struct_array.data_type().clone(),
true,
)]));

vec![RecordBatch::try_new(schema, vec![Arc::new(outer_struct_array)]).unwrap()]
}
Scenario::Time32Second => {
vec![
make_time32_batches(Scenario::Time32Second, vec![18506, 18507, 18508, 18509]),
Expand Down
Loading

0 comments on commit cbd6eac

Please sign in to comment.