From 6f330c98b390b23ae6ba04dce1b264fcfd28e684 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Sun, 7 Jul 2024 14:42:16 -0700 Subject: [PATCH] Fix data page statistics when all rows are null in a data page (#11295) * Adds tests for data page statistics when all values on the page are null. Fixes most of the failing tests for iterators not handling this situation correctly. * Fix handling of data page statistics for FixedBinaryArray using a builder. * Fix data page all nulls stats test for Dictionary DataType. * Fixes handling of None statistics for Decimal128 and Decimal256. * Consolidate make_data_page_stats_iterator uses. * Fix linting error. * Remove unnecessary collect. --------- Co-authored-by: Eric Fredine --- .../physical_plan/parquet/statistics.rs | 128 +++++++------- .../core/tests/parquet/arrow_statistics.rs | 158 +++++++++++++----- 2 files changed, 184 insertions(+), 102 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index bd05fe64e62d..b9aca2ac2cc9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,6 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 +use arrow::array::builder::FixedSizeBinaryBuilder; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ @@ -600,6 +601,31 @@ make_data_page_stats_iterator!( Index::DOUBLE, f64 ); +make_data_page_stats_iterator!( + MinByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.min.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.max.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxFixedLenByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.max.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); + +make_data_page_stats_iterator!( + MinFixedLenByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.min.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); macro_rules! get_decimal_page_stats_iterator { ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { @@ -634,9 +660,7 @@ macro_rules! get_decimal_page_stats_iterator { .indexes .iter() .map(|x| { - Some($stat_value_type::from( - x.$func.unwrap_or_default(), - )) + x.$func.and_then(|x| Some($stat_value_type::from(x))) }) .collect::>(), ), @@ -645,9 +669,7 @@ macro_rules! get_decimal_page_stats_iterator { .indexes .iter() .map(|x| { - Some($stat_value_type::from( - x.$func.unwrap_or_default(), - )) + x.$func.and_then(|x| Some($stat_value_type::from(x))) }) .collect::>(), ), @@ -656,9 +678,9 @@ macro_rules! get_decimal_page_stats_iterator { .indexes .iter() .map(|x| { - Some($convert_func( - x.clone().$func.unwrap_or_default().data(), - )) + x.clone() + .$func + .and_then(|x| Some($convert_func(x.data()))) }) .collect::>(), ), @@ -667,9 +689,9 @@ macro_rules! get_decimal_page_stats_iterator { .indexes .iter() .map(|x| { - Some($convert_func( - x.clone().$func.unwrap_or_default().data(), - )) + x.clone() + .$func + .and_then(|x| Some($convert_func(x.data()))) }) .collect::>(), ), @@ -713,32 +735,6 @@ get_decimal_page_stats_iterator!( i256, from_bytes_to_i256 ); -make_data_page_stats_iterator!( - MinByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.min.clone() }, - Index::BYTE_ARRAY, - ByteArray -); -make_data_page_stats_iterator!( - MaxByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.max.clone() }, - Index::BYTE_ARRAY, - ByteArray -); - -make_data_page_stats_iterator!( - MaxFixedLenByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.max.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, - FixedLenByteArray -); - -make_data_page_stats_iterator!( - MinFixedLenByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.min.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, - FixedLenByteArray -); macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { @@ -757,7 +753,7 @@ macro_rules! get_data_page_statistics { UInt8Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| u8::try_from(x).ok()) }) }) @@ -768,7 +764,7 @@ macro_rules! get_data_page_statistics { UInt16Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| u16::try_from(x).ok()) }) }) @@ -779,7 +775,7 @@ macro_rules! get_data_page_statistics { UInt32Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| Some(x as u32)) }) }) @@ -789,7 +785,7 @@ macro_rules! get_data_page_statistics { UInt64Array::from_iter( [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| Some(x as u64)) }) }) @@ -799,7 +795,7 @@ macro_rules! get_data_page_statistics { Int8Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| i8::try_from(x).ok()) }) }) @@ -810,7 +806,7 @@ macro_rules! get_data_page_statistics { Int16Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| i16::try_from(x).ok()) }) }) @@ -823,8 +819,8 @@ macro_rules! get_data_page_statistics { Float16Array::from_iter( [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { - x.and_then(|x| Some(from_bytes_to_f16(x.data()))) + x.into_iter().map(|x| { + x.and_then(|x| from_bytes_to_f16(x.data())) }) }) .flatten() @@ -836,7 +832,7 @@ macro_rules! get_data_page_statistics { Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| { let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); if res.is_none() { @@ -849,7 +845,7 @@ macro_rules! get_data_page_statistics { ))), Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| { let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); if res.is_none() { @@ -878,10 +874,10 @@ macro_rules! get_data_page_statistics { Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { x.into_iter() - .filter_map(|x| { + .map(|x| { x.and_then(|x| i64::try_from(x).ok()) + .map(|x| x * 24 * 60 * 60 * 1000) }) - .map(|x| x * 24 * 60 * 60 * 1000) }).flatten().collect::>() ) ) @@ -919,16 +915,28 @@ macro_rules! get_data_page_statistics { }) }, Some(DataType::FixedSizeBinary(size)) => { - Ok(Arc::new( - FixedSizeBinaryArray::try_from_iter( - [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator) - .flat_map(|x| x.into_iter()) - .filter_map(|x| x) - ).unwrap_or_else(|e| { - log::debug!("FixedSizeBinary statistics is invalid: {}", e); - FixedSizeBinaryArray::new(*size, vec![].into(), None) - }) - )) + let mut builder = FixedSizeBinaryBuilder::new(*size); + let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + if x.len() == *size as usize { + let _ = builder.append_value(x.data()); + } else { + log::debug!( + "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + size, + x.len(), + ); + builder.append_null(); + } + } + } + Ok(Arc::new(builder.finish())) }, _ => unimplemented!() } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 6bfb9b02d347..2b4ba0b17133 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -29,15 +29,15 @@ use arrow::datatypes::{ TimestampNanosecondType, TimestampSecondType, }; use arrow_array::{ - make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, - Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - LargeStringArray, RecordBatch, StringArray, Time32MillisecondArray, + make_array, new_null_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, + Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use datafusion::datasource::physical_plan::parquet::StatisticsConverter; use half::f16; use parquet::arrow::arrow_reader::{ @@ -91,51 +91,60 @@ impl Int64Case { // Create a parquet file with the specified settings pub fn build(&self) -> ParquetRecordBatchReaderBuilder { - let mut output_file = tempfile::Builder::new() - .prefix("parquert_statistics_test") - .suffix(".parquet") - .tempfile() - .expect("tempfile creation"); - - let mut builder = - WriterProperties::builder().set_max_row_group_size(self.row_per_group); - if let Some(enable_stats) = self.enable_stats { - builder = builder.set_statistics_enabled(enable_stats); - } - if let Some(data_page_row_count_limit) = self.data_page_row_count_limit { - builder = builder.set_data_page_row_count_limit(data_page_row_count_limit); - } - let props = builder.build(); - let batches = vec![self.make_int64_batches_with_null()]; + build_parquet_file( + self.row_per_group, + self.enable_stats, + self.data_page_row_count_limit, + batches, + ) + } +} - let schema = batches[0].schema(); +fn build_parquet_file( + row_per_group: usize, + enable_stats: Option, + data_page_row_count_limit: Option, + batches: Vec, +) -> ParquetRecordBatchReaderBuilder { + let mut output_file = tempfile::Builder::new() + .prefix("parquert_statistics_test") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let mut builder = WriterProperties::builder().set_max_row_group_size(row_per_group); + if let Some(enable_stats) = enable_stats { + builder = builder.set_statistics_enabled(enable_stats); + } + if let Some(data_page_row_count_limit) = data_page_row_count_limit { + builder = builder.set_data_page_row_count_limit(data_page_row_count_limit); + } + let props = builder.build(); - let mut writer = - ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + let schema = batches[0].schema(); - // if we have a datapage limit send the batches in one at a time to give - // the writer a chance to be split into multiple pages - if self.data_page_row_count_limit.is_some() { - for batch in batches { - for i in 0..batch.num_rows() { - writer.write(&batch.slice(i, 1)).expect("writing batch"); - } - } - } else { - for batch in batches { - writer.write(&batch).expect("writing batch"); + let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + // if we have a datapage limit send the batches in one at a time to give + // the writer a chance to be split into multiple pages + if data_page_row_count_limit.is_some() { + for batch in &batches { + for i in 0..batch.num_rows() { + writer.write(&batch.slice(i, 1)).expect("writing batch"); } } + } else { + for batch in &batches { + writer.write(batch).expect("writing batch"); + } + } - // close file - let _file_meta = writer.close().unwrap(); + let _file_meta = writer.close().unwrap(); - // open the file & get the reader - let file = output_file.reopen().unwrap(); - let options = ArrowReaderOptions::new().with_page_index(true); - ArrowReaderBuilder::try_new_with_options(file, options).unwrap() - } + let file = output_file.reopen().unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + ArrowReaderBuilder::try_new_with_options(file, options).unwrap() } /// Defines what data to create in a parquet file @@ -503,6 +512,71 @@ async fn test_multiple_data_pages_nulls_and_negatives() { .run() } +#[tokio::test] +async fn test_data_page_stats_with_all_null_page() { + for data_type in &[ + DataType::Boolean, + DataType::UInt64, + DataType::UInt32, + DataType::UInt16, + DataType::UInt8, + DataType::Int64, + DataType::Int32, + DataType::Int16, + DataType::Int8, + DataType::Float16, + DataType::Float32, + DataType::Float64, + DataType::Date32, + DataType::Date64, + DataType::Time32(TimeUnit::Millisecond), + DataType::Time32(TimeUnit::Second), + DataType::Time64(TimeUnit::Microsecond), + DataType::Time64(TimeUnit::Nanosecond), + DataType::Timestamp(TimeUnit::Second, None), + DataType::Timestamp(TimeUnit::Millisecond, None), + DataType::Timestamp(TimeUnit::Microsecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Binary, + DataType::LargeBinary, + DataType::FixedSizeBinary(3), + DataType::Utf8, + DataType::LargeUtf8, + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + DataType::Decimal128(8, 2), // as INT32 + DataType::Decimal128(10, 2), // as INT64 + DataType::Decimal128(20, 2), // as FIXED_LEN_BYTE_ARRAY + DataType::Decimal256(8, 2), // as INT32 + DataType::Decimal256(10, 2), // as INT64 + DataType::Decimal256(20, 2), // as FIXED_LEN_BYTE_ARRAY + ] { + let batch = + RecordBatch::try_from_iter(vec![("col", new_null_array(data_type, 4))]) + .expect("record batch creation"); + + let reader = + build_parquet_file(4, Some(EnabledStatistics::Page), Some(4), vec![batch]); + + let expected_data_type = match data_type { + DataType::Dictionary(_, value_type) => value_type.as_ref(), + _ => data_type, + }; + + // There is one data page with 4 nulls + // The statistics should be present but null + Test { + reader: &reader, + expected_min: new_null_array(expected_data_type, 1), + expected_max: new_null_array(expected_data_type, 1), + expected_null_counts: UInt64Array::from(vec![4]), + expected_row_counts: Some(UInt64Array::from(vec![4])), + column_name: "col", + check: Check::DataPage, + } + .run() + } +} + /////////////// MORE GENERAL TESTS ////////////////////// // . Many columns in a file // . Differnet data types