From 0c3aaa7c2a1e7bde817ae1fd88ffc9e437dc60ad Mon Sep 17 00:00:00 2001 From: Lordworms Date: Thu, 18 Jul 2024 18:52:04 -0700 Subject: [PATCH 1/3] Extract parquet statistics for StructArray --- parquet/src/arrow/arrow_reader/statistics.rs | 151 ++++++++++++++++--- parquet/src/arrow/mod.rs | 47 ++++-- parquet/tests/arrow_reader/mod.rs | 52 +++++++ parquet/tests/arrow_reader/statistics.rs | 97 +++++++++++- 4 files changed, 305 insertions(+), 42 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index c42f92838c8c..a061ca69e452 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -16,7 +16,6 @@ // under the License. //! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`]. - use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; use crate::data_type::{ByteArray, FixedLenByteArray}; @@ -29,6 +28,7 @@ use arrow_array::builder::{ BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder, }; +use arrow_array::StructArray; use arrow_array::{ new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array, @@ -1216,7 +1216,81 @@ impl<'a> StatisticsConverter<'a> { } Ok(Some(builder.finish())) } - + pub(crate) fn get_statistics_min_max_recursive( + metadata: &[&RowGroupMetaData], + index: &mut usize, + is_min: bool, + data_type: &DataType, + ) -> Result { + match data_type.is_nested() { + false => { + let iterator = metadata.iter().map(|meta| { + let stat = meta.column(*index).statistics(); + stat + }); + let stat = if is_min { + min_statistics(data_type, iterator) + } else { + max_statistics(data_type, iterator) + }; + *index += 1; + stat + } + true => { + if let DataType::Struct(fields) = data_type { + let field_arrays: Vec<_> = fields + .iter() + .map(|field| { + let array = Self::get_statistics_min_max_recursive( + metadata, + index, + is_min, + field.data_type(), + )?; + Ok((field.clone(), array)) + }) + .collect::>>()?; + Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef) + } else { + Err(arrow_err!( + "unsupported nested data type for extracting statistics".to_string() + )) + } + } + } + } + /// recursively get the corresponding statistics for all the column data, used for + /// DataType::Struct + pub(crate) fn get_null_counts_recursive( + metadata: &[&RowGroupMetaData], + index: usize, + data_type: &DataType, + ) -> Vec { + if let DataType::Struct(fields) = data_type { + let num_row_groups = metadata.len(); + fields + .iter() + .fold(vec![0; num_row_groups], |mut acc, field| { + let field_null = + Self::get_null_counts_recursive(metadata, index + 1, field.data_type()); + + acc.iter_mut() + .zip(field_null.iter()) + .for_each(|(a, b)| *a += b); + acc + }) + } else { + metadata + .iter() + .map(|meta| { + meta.column(index) + .statistics() + .map(|s| s.null_count()) + .unwrap_or(0) + }) + .collect() + } + } /// Create a new `StatisticsConverter` to extract statistics for a column /// /// Note if there is no corresponding column in the parquet file, the returned @@ -1314,13 +1388,21 @@ impl<'a> StatisticsConverter<'a> { let Some(parquet_index) = self.parquet_column_index else { return Ok(self.make_null_array(data_type, metadatas)); }; - - let iter = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()); - min_statistics(data_type, iter) + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; + match data_type { + // In a Rowgroup, parquet for nested struct members, + // each one is also stored in the Column of RowGroupMetadata in order. + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + Self::get_statistics_min_max_recursive(&group_vec, &mut 0, true, data_type) + } + _ => min_statistics(data_type, create_iterator(metadatas, parquet_index)), + } } - /// Extract the maximum values from row group statistics in [`RowGroupMetaData`] /// /// See docs on [`Self::row_group_mins`] for details @@ -1334,10 +1416,20 @@ impl<'a> StatisticsConverter<'a> { return Ok(self.make_null_array(data_type, metadatas)); }; - let iter = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()); - max_statistics(data_type, iter) + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; + match data_type { + // In a Rowgroup, parquet for nested struct members, + // each one is also stored in the Column of RowGroupMetadata in order. + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + Self::get_statistics_min_max_recursive(&group_vec, &mut 0, false, data_type) + } + _ => max_statistics(data_type, create_iterator(metadatas, parquet_index)), + } } /// Extract the null counts from row group statistics in [`RowGroupMetaData`] @@ -1347,18 +1439,32 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { + let data_type = self.arrow_field.data_type(); + let Some(parquet_index) = self.parquet_column_index else { let num_row_groups = metadatas.into_iter().count(); return Ok(UInt64Array::from_iter( std::iter::repeat(None).take(num_row_groups), )); }; + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; - let null_counts = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()) - .map(|s| s.map(|s| s.null_count())); - Ok(UInt64Array::from_iter(null_counts)) + match data_type { + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + let null_counts = Self::get_null_counts_recursive(&group_vec, 0, data_type); + Ok(UInt64Array::from_iter(null_counts)) + } + _ => { + let null_counts = + create_iterator(metadatas, parquet_index).map(|s| s.map(|s| s.null_count())); + Ok(UInt64Array::from_iter(null_counts)) + } + } } /// Extract the minimum values from Data Page statistics. @@ -2058,7 +2164,7 @@ mod test { #[test] fn roundtrip_struct() { - let mut test = Test { + let test = Test { input: struct_array(vec![ // row group 1 (Some(true), Some(1)), @@ -2075,20 +2181,18 @@ mod test { ]), expected_min: struct_array(vec![ (Some(true), Some(1)), - (Some(true), Some(0)), + (Some(false), Some(0)), (None, None), ]), expected_max: struct_array(vec![ (Some(true), Some(3)), - (Some(true), Some(0)), + (Some(true), Some(5)), (None, None), ]), }; // Due to https://github.com/apache/datafusion/issues/8334, // statistics for struct arrays are not supported - test.expected_min = new_null_array(test.input.data_type(), test.expected_min.len()); - test.expected_max = new_null_array(test.input.data_type(), test.expected_min.len()); test.run() } @@ -2424,7 +2528,8 @@ mod test { let row_groups = metadata.row_groups(); for field in schema.fields() { - if field.data_type().is_nested() { + let data_type = field.data_type(); + if field.data_type().is_nested() && !matches!(data_type, &DataType::Struct(_)) { let lookup = parquet_column(parquet_schema, &schema, field.name()); assert_eq!(lookup, None); continue; diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 5c36891434c3..9a5cb17aad6e 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -113,13 +113,13 @@ pub use self::arrow_writer::ArrowWriter; pub use self::async_reader::ParquetRecordBatchStreamBuilder; #[cfg(feature = "async")] pub use self::async_writer::AsyncArrowWriter; -use crate::schema::types::SchemaDescriptor; -use arrow_schema::{FieldRef, Schema}; - pub use self::schema::{ arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, FieldLevels, }; +use crate::schema::types::SchemaDescriptor; +use arrow_schema::DataType; +use arrow_schema::{FieldRef, Schema}; /// Schema metadata key used to store serialized Arrow IPC schema pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema"; @@ -215,24 +215,39 @@ impl ProjectionMask { /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field -pub fn parquet_column<'a>( +pub(crate) fn parquet_column<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, name: &str, ) -> Option<(usize, &'a FieldRef)> { let (root_idx, field) = arrow_schema.fields.find(name)?; - if field.data_type().is_nested() { - // Nested fields are not supported and require non-trivial logic - // to correctly walk the parquet schema accounting for the - // logical type rules - - // - // For example a ListArray could correspond to anything from 1 to 3 levels - // in the parquet schema - return None; + if !field.data_type().is_nested() { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + return Some((parquet_idx, field)); } + // Nested field + match field.data_type() { + DataType::Struct(_) => { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + Some((parquet_idx, field)) + } + _ => { + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + None + } else { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + Some((parquet_idx, field)) + } + } + } +} - // This could be made more efficient (#TBD) - let parquet_idx = (0..parquet_schema.columns().len()) - .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; - Some((parquet_idx, field)) +fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option { + (0..parquet_schema.columns().len()).find(|x| parquet_schema.get_column_root_idx(*x) == root_idx) } diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 7e979dcf3ec0..3ad1e89eed53 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -26,6 +26,7 @@ use arrow_array::{ TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_buffer::i256; +use arrow_schema::Fields; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::Datelike; use chrono::{Duration, TimeDelta}; @@ -87,6 +88,7 @@ enum Scenario { Dictionary, PeriodsInColumnNames, StructArray, + StructArrayNested, UTF8, UTF8View, BinaryView, @@ -890,6 +892,56 @@ fn create_data_batch(scenario: Scenario) -> Vec { )])); vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] } + Scenario::StructArrayNested => { + let inner_boolean = Arc::new(BooleanArray::from(vec![false, true, false])); + let inner_int = Arc::new(Int32Array::from(vec![42, 43, 44])); + + let inner_array = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + inner_boolean as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + inner_int as ArrayRef, + ), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]); + + let outer_float = Arc::new(Float64Array::from(vec![5.0, 6.0, 7.0])); + let outer_boolean = Arc::new(BooleanArray::from(vec![true, false, true])); + + let outer_struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields), + false, + )), + Arc::new(inner_array) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + outer_float as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + outer_boolean as ArrayRef, + ), + ]); + + let schema = Arc::new(Schema::new(vec![Field::new( + "nested_struct", + outer_struct_array.data_type().clone(), + true, + )])); + + vec![RecordBatch::try_new(schema, vec![Arc::new(outer_struct_array)]).unwrap()] + } Scenario::Time32Second => { vec![ make_time32_batches(Scenario::Time32Second, vec![18506, 18507, 18508, 18509]), diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 75a73ac1309f..8f15b9fb836a 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -22,12 +22,14 @@ use std::default::Default; use std::fs::File; use std::sync::Arc; +use super::make_test_file_rg; use super::{struct_array, Scenario}; use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::{ i256, Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; +use arrow_array::StructArray; use arrow_array::{ make_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, @@ -37,6 +39,7 @@ use arrow_array::{ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; +use arrow_schema::Fields; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use half::f16; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; @@ -46,8 +49,6 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; -use super::make_test_file_rg; - #[derive(Debug, Default, Clone)] struct Int64Case { /// Number of nulls in the column @@ -2001,7 +2002,6 @@ async fn test_boolean() { // BUG // https://github.com/apache/datafusion/issues/10609 // Note that: since I have not worked on struct array before, there may be a bug in the test code rather than the real bug in the code -#[ignore] #[tokio::test] async fn test_struct() { // This creates a parquet files of 1 column named "struct" @@ -2024,6 +2024,97 @@ async fn test_struct() { .run(); } +// test nested struct +#[tokio::test] +async fn test_nested_struct() { + // This creates a parquet file with 1 column named "nested_struct" + // The file is created by 1 record batch with 3 rows in the nested struct array + let reader = TestReader { + scenario: Scenario::StructArrayNested, + row_per_group: 5, + } + .build() + .await; + + // Expected minimum and maximum values for nested struct fields + let inner_min = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(false)])) as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![Some(42)])) as ArrayRef, + ), + ]); + let inner_max = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(true)])) as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![Some(44)])) as ArrayRef, + ), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]); + + // Expected minimum outer struct + let expected_min_outer = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields.clone()), + false, + )), + Arc::new(inner_min) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + Arc::new(Float64Array::from(vec![Some(5.0)])) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(false)])) as ArrayRef, + ), + ]); + + // Expected maximum outer struct + let expected_max_outer = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields), + false, + )), + Arc::new(inner_max) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + Arc::new(Float64Array::from(vec![Some(7.0)])) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(true)])) as ArrayRef, + ), + ]); + + Test { + reader: &reader, + expected_min: Arc::new(expected_min_outer), + expected_max: Arc::new(expected_max_outer), + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: Some(UInt64Array::from(vec![3])), + column_name: "nested_struct", + check: Check::RowGroup, + } + .run(); +} + // UTF8 #[tokio::test] async fn test_utf8() { From 3d285d96fd840b98e331e1502ab7ac2369e0b948 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Sun, 4 Aug 2024 21:29:32 -0700 Subject: [PATCH 2/3] fix doc --- parquet/src/arrow/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 9a5cb17aad6e..0a4d17408072 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -215,7 +215,7 @@ impl ProjectionMask { /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field -pub(crate) fn parquet_column<'a>( +pub fn parquet_column<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, name: &str, From a1d3f52cf9cff76ea9002517c34d0198be2a2832 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Sun, 4 Aug 2024 21:32:21 -0700 Subject: [PATCH 3/3] fix clippy --- parquet/src/arrow/arrow_reader/statistics.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index a061ca69e452..6bf06ce40975 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1686,10 +1686,10 @@ mod test { use arrow::datatypes::{i256, Date32Type, Date64Type}; use arrow::util::test_util::parquet_test_data; use arrow_array::{ - new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray, - BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, - StringArray, StringViewArray, StructArray, TimestampNanosecondArray, + new_empty_array, Array, ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, + Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, + StringViewArray, StructArray, TimestampNanosecondArray, }; use arrow_schema::{DataType, Field, SchemaRef}; use bytes::Bytes;