From cb1cfad1d792486572a6797a6224a7a5a2e00e9b Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 12:10:58 +0800 Subject: [PATCH 01/14] add u64 poc. --- datafusion/core/benches/parquet_statistic.rs | 4 +- .../physical_plan/parquet/statistics.rs | 320 ++++++++++++++++-- 2 files changed, 304 insertions(+), 20 deletions(-) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index 3595e8773b07..b66bdae5fffc 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -198,7 +198,9 @@ fn make_dict_batch() -> RecordBatch { fn criterion_benchmark(c: &mut Criterion) { let row_groups = 100; use TestTypes::*; - let types = vec![Int64, UInt64, F64, String, Dictionary]; + // let types = vec![Int64, UInt64, F64, String, Dictionary]; + // let types = vec![String]; + let types = vec![UInt64]; let data_page_row_count_limits = vec![None, Some(1)]; for dtype in types { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index bd05fe64e62d..bd851b42cbd6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,6 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 +use arrow::array::{StringBuilder, UInt64Builder}; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ @@ -785,16 +786,49 @@ macro_rules! get_data_page_statistics { }) .flatten() ))), - Some(DataType::UInt64) => Ok(Arc::new( - UInt64Array::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) + // Some(DataType::UInt64) => Ok(Arc::new( + // UInt64Array::from_iter( + // [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) + // .map(|x| { + // x.into_iter().map(|x| { + // x.and_then(|x| Some(x as u64)) + // }) + // }) + // .flatten() + // ))), + + // Some(DataType::UInt64) => { + // let iterator = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) + // .map(|x| { + // x.into_iter().map(|x| { + // x.and_then(|x| Some(x as u64)) + // }) + // }) + // .flatten(); + // let mut builder = UInt64Builder::with_capacity(512); + // // let mut builder = UInt64Builder::new(); + // for x in iterator { + // let Some(x) = x else { + // builder.append_null(); + // continue; + // }; + // builder.append_value(x as u64); + // } + // Ok(Arc::new(builder.finish())) + // }, + + Some(DataType::UInt64) => { + let iterator = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) .map(|x| { - x.into_iter().filter_map(|x| { + x.into_iter().map(|x| { x.and_then(|x| Some(x as u64)) }) }) - .flatten() - ))), + .flatten(); + let mut builder = UInt64Builder::with_capacity(512); + builder.extend(iterator); + Ok(Arc::new(builder.finish())) + }, Some(DataType::Int8) => Ok(Arc::new( Int8Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) @@ -834,19 +868,54 @@ macro_rules! get_data_page_statistics { Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), - Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( - [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().filter_map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - if res.is_none() { + // Some(DataType::Utf8) => Ok(Arc::new(StringArray::from_iter( + // [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + // x.into_iter().map(|x| { + // x.and_then(|x| { + // let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + // if res.is_none() { + // log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + // } + // res + // }) + // }) + // }).flatten(), + // ))), + Some(DataType::Utf8) => { + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }) - }).flatten().collect::>(), - ))), + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, + // Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( + // [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + // x.into_iter().map(|x| { + // x.and_then(|x| { + // let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + // if res.is_none() { + // log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + // } + // res + // }) + // }) + // }).flatten().collect::>(), + // ))), + Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { x.into_iter().filter_map(|x| { @@ -994,7 +1063,7 @@ where get_data_page_statistics!(Min, data_type, iterator) } -/// Extracts the max statistics from an iterator +/// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn max_page_statistics<'a, I>( data_type: Option<&DataType>, @@ -1006,6 +1075,219 @@ where get_data_page_statistics!(Max, data_type, iterator) } +/// Extracts the max statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +// pub(crate) fn max_page_statistics<'a, I>( +// data_type: Option<&DataType>, +// iterator: I, +// ) -> Result +// where +// I: Iterator, +// { +// match data_type { +// Some(DataType::Boolean) => Ok(Arc::new(BooleanArray::from_iter( +// MaxBooleanDataPageStatsIterator::new(iterator) +// .flatten() +// .collect::>() +// .into_iter(), +// ))), +// Some(DataType::UInt8) => Ok(Arc::new(UInt8Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| u8::try_from(x).ok())) +// }) +// .flatten(), +// ))), +// Some(DataType::UInt16) => Ok(Arc::new(UInt16Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| u16::try_from(x).ok())) +// }) +// .flatten(), +// ))), +// Some(DataType::UInt32) => Ok(Arc::new(UInt32Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u32)))) +// .flatten(), +// ))), +// // Some(DataType::UInt64) => Ok(Arc::new(UInt64Array::from_iter( +// // MaxInt64DataPageStatsIterator::new(iterator) +// // .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u64)))) +// // .flatten(), +// // ))), +// Some(DataType::UInt64) => { +// let mut builder = UInt64Builder::new(); +// let iterator = MaxInt64DataPageStatsIterator::new(iterator); +// builder.extend(iterator); +// for x in iterator { +// for x in x.into_iter() { +// let Some(x) = x else { +// builder.append_null(); +// continue; +// }; +// builder.append_value(x); +// } +// } +// } +// Some(DataType::Int8) => Ok(Arc::new(Int8Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| i8::try_from(x).ok())) +// }) +// .flatten(), +// ))), +// Some(DataType::Int16) => Ok(Arc::new(Int16Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| i16::try_from(x).ok())) +// }) +// .flatten(), +// ))), +// Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( +// MaxInt64DataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Float16) => Ok(Arc::new(Float16Array::from_iter( +// MaxFloat16DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| Some(from_bytes_to_f16(x.data())))) +// }) +// .flatten(), +// ))), +// Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter( +// MaxFloat32DataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter( +// MaxFloat64DataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter( +// MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter( +// MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Utf8) => { +// Ok(Arc::new(StringArray::from_iter( +// MaxByteArrayDataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter().map(|x| { +// x.and_then(|x|{ +// let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); +// if res.is_none() { +// log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); +// } +// res +// }) +// }) +// }) +// .flatten(), +// ))) +// } +// Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( +// MaxByteArrayDataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter().filter_map(|x| { +// x.and_then(|x|{ +// let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); +// if res.is_none(){ +// log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); +// }res +// }) +// }) +// }) +// .flatten() +// .collect::>(), +// ))), +// Some(DataType::Dictionary(_, value_type)) => { +// max_page_statistics(Some(value_type), iterator) +// } +// Some(DataType::Timestamp(unit, timezone)) => { +// let iter = MaxInt64DataPageStatsIterator::new(iterator).flatten(); +// Ok(match unit { +// TimeUnit::Second => Arc::new( +// TimestampSecondArray::from_iter(iter) +// .with_timezone_opt(timezone.clone()), +// ), +// TimeUnit::Millisecond => Arc::new( +// TimestampMillisecondArray::from_iter(iter) +// .with_timezone_opt(timezone.clone()), +// ), +// TimeUnit::Microsecond => Arc::new( +// TimestampMicrosecondArray::from_iter(iter) +// .with_timezone_opt(timezone.clone()), +// ), +// TimeUnit::Nanosecond => Arc::new( +// TimestampNanosecondArray::from_iter(iter) +// .with_timezone_opt(timezone.clone()), +// ), +// }) +// } +// Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator).flatten(), +// ))), +// Some(DataType::Date64) => Ok(Arc::new(Date64Array::from( +// MaxInt32DataPageStatsIterator::new(iterator) +// .map(|x| { +// x.into_iter() +// .filter_map(|x| x.and_then(|x| i64::try_from(x).ok())) +// .map(|x| x * 24 * 60 * 60 * 1000) +// }) +// .flatten() +// .collect::>(), +// ))), +// Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( +// Decimal128Array::from_iter( +// MaxDecimal128DataPageStatsIterator::new(iterator).flatten(), +// ) +// .with_precision_and_scale(*precision, *scale)?, +// )), +// Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( +// Decimal256Array::from_iter( +// MaxDecimal256DataPageStatsIterator::new(iterator).flatten(), +// ) +// .with_precision_and_scale(*precision, *scale)?, +// )), +// Some(DataType::Time32(unit)) => Ok(match unit { +// TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator).flatten(), +// )), +// TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( +// MaxInt32DataPageStatsIterator::new(iterator).flatten(), +// )), +// _ => new_empty_array(&DataType::Time32(unit.clone())), +// }), +// Some(DataType::Time64(unit)) => Ok(match unit { +// TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( +// MaxInt64DataPageStatsIterator::new(iterator).flatten(), +// )), +// TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( +// MaxInt64DataPageStatsIterator::new(iterator).flatten(), +// )), +// _ => new_empty_array(&DataType::Time64(unit.clone())), +// }), +// Some(DataType::FixedSizeBinary(size)) => Ok(Arc::new( +// FixedSizeBinaryArray::try_from_iter( +// MaxFixedLenByteArrayDataPageStatsIterator::new(iterator) +// .flat_map(|x| x.into_iter()) +// .filter_map(|x| x), +// ) +// .unwrap_or_else(|e| { +// log::debug!("FixedSizeBinary statistics is invalid: {}", e); +// FixedSizeBinaryArray::new(*size, vec![].into(), None) +// }), +// )), +// _ => unimplemented!(), +// } +// } + + /// Extracts the null count statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] /// From c67e21d45579c3e61b6f966e4c60d149dc074ca4 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 13:49:08 +0800 Subject: [PATCH 02/14] use env to support the quick bench. --- datafusion/core/benches/parquet_statistic.rs | 5 +- .../physical_plan/parquet/statistics.rs | 98 ++++++++++--------- 2 files changed, 57 insertions(+), 46 deletions(-) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index b66bdae5fffc..08823686dfc8 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -200,7 +200,7 @@ fn criterion_benchmark(c: &mut Criterion) { use TestTypes::*; // let types = vec![Int64, UInt64, F64, String, Dictionary]; // let types = vec![String]; - let types = vec![UInt64]; + let types = vec![String]; let data_page_row_count_limits = vec![None, Some(1)]; for dtype in types { @@ -217,7 +217,8 @@ fn criterion_benchmark(c: &mut Criterion) { let statistic_type = if data_page_row_count_limit.is_some() { "data page" } else { - "row group" + // "row group" + continue; }; let mut group = c.benchmark_group(format!( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index bd851b42cbd6..1ecfeca83e75 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -40,6 +40,7 @@ use parquet::file::page_index::index::{Index, PageIndex}; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use paste::paste; +use std::env; use std::sync::Arc; // Convert the bytes array to i128. @@ -868,54 +869,62 @@ macro_rules! get_data_page_statistics { Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), - // Some(DataType::Utf8) => Ok(Arc::new(StringArray::from_iter( - // [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - // x.into_iter().map(|x| { - // x.and_then(|x| { - // let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - // if res.is_none() { - // log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - // } - // res - // }) - // }) - // }).flatten(), - // ))), Some(DataType::Utf8) => { - let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - builder.append_null(); - continue; - }; - - builder.append_value(x); + let mode = env::var("MODE").unwrap_or_default(); + match mode.as_str() { + "from_iter" => { + Ok(Arc::new(StringArray::from_iter( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten(), + ))) + }, + "use_builder" => { + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, + _ => { + Ok(Arc::new(StringArray::from( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten().collect::>(), + ))) } } - Ok(Arc::new(builder.finish())) }, - // Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( - // [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - // x.into_iter().map(|x| { - // x.and_then(|x| { - // let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - // if res.is_none() { - // log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - // } - // res - // }) - // }) - // }).flatten().collect::>(), - // ))), - Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { x.into_iter().filter_map(|x| { @@ -1072,6 +1081,7 @@ pub(crate) fn max_page_statistics<'a, I>( where I: Iterator, { + let mode = env::var("MODE").unwrap_or_default(); get_data_page_statistics!(Max, data_type, iterator) } From fc1bf8c194929abdd2ec4a23ec32ea0dff0df2c7 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 15:49:22 +0800 Subject: [PATCH 03/14] use flatten in builder mode yet. --- .../physical_plan/parquet/statistics.rs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 1ecfeca83e75..6f9d1bd3d6b6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -889,22 +889,20 @@ macro_rules! get_data_page_statistics { }, "use_builder" => { let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten(); for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - builder.append_null(); - continue; - }; - - builder.append_value(x); - } + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); } Ok(Arc::new(builder.finish())) }, From 4140eb07c54b8fe5ae62faab14b8a8ca6004568b Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 15:59:29 +0800 Subject: [PATCH 04/14] add new mode. --- .../physical_plan/parquet/statistics.rs | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 6f9d1bd3d6b6..7caf4b9e9937 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -887,7 +887,7 @@ macro_rules! get_data_page_statistics { }).flatten(), ))) }, - "use_builder" => { + "use_builder_flatten" => { let mut builder = StringBuilder::new(); let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten(); for x in iterator { @@ -906,6 +906,27 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + "use_builder" => { + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten(); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, _ => { Ok(Arc::new(StringArray::from( [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { From 88e39575b79b13577508923ea214b1e1fff47081 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 16:01:18 +0800 Subject: [PATCH 05/14] use Builder in Utf8 and LargeUtf8 page level stats' convert. --- .../physical_plan/parquet/statistics.rs | 585 ++++++++---------- 1 file changed, 247 insertions(+), 338 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 7caf4b9e9937..8516abe5df98 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,7 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::array::{StringBuilder, UInt64Builder}; +use arrow::array::StringBuilder; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ @@ -40,7 +40,6 @@ use parquet::file::page_index::index::{Index, PageIndex}; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use paste::paste; -use std::env; use std::sync::Arc; // Convert the bytes array to i128. @@ -787,49 +786,16 @@ macro_rules! get_data_page_statistics { }) .flatten() ))), - // Some(DataType::UInt64) => Ok(Arc::new( - // UInt64Array::from_iter( - // [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) - // .map(|x| { - // x.into_iter().map(|x| { - // x.and_then(|x| Some(x as u64)) - // }) - // }) - // .flatten() - // ))), - - // Some(DataType::UInt64) => { - // let iterator = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) - // .map(|x| { - // x.into_iter().map(|x| { - // x.and_then(|x| Some(x as u64)) - // }) - // }) - // .flatten(); - // let mut builder = UInt64Builder::with_capacity(512); - // // let mut builder = UInt64Builder::new(); - // for x in iterator { - // let Some(x) = x else { - // builder.append_null(); - // continue; - // }; - // builder.append_value(x as u64); - // } - // Ok(Arc::new(builder.finish())) - // }, - - Some(DataType::UInt64) => { - let iterator = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) + Some(DataType::UInt64) => Ok(Arc::new( + UInt64Array::from_iter( + [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) .map(|x| { x.into_iter().map(|x| { x.and_then(|x| Some(x as u64)) }) }) - .flatten(); - let mut builder = UInt64Builder::with_capacity(512); - builder.extend(iterator); - Ok(Arc::new(builder.finish())) - }, + .flatten() + ))), Some(DataType::Int8) => Ok(Arc::new( Int8Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) @@ -870,93 +836,47 @@ macro_rules! get_data_page_statistics { Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Utf8) => { - let mode = env::var("MODE").unwrap_or_default(); - match mode.as_str() { - "from_iter" => { - Ok(Arc::new(StringArray::from_iter( - [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }) - }).flatten(), - ))) - }, - "use_builder_flatten" => { - let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten(); - for x in iterator { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - builder.append_null(); - continue; - }; - - builder.append_value(x); - } - Ok(Arc::new(builder.finish())) - }, - "use_builder" => { - let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten(); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - builder.append_null(); - continue; - }; - - builder.append_value(x); - } - } - Ok(Arc::new(builder.finish())) - }, - _ => { - Ok(Arc::new(StringArray::from( - [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }) - }).flatten().collect::>(), - ))) + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); } } + Ok(Arc::new(builder.finish())) + }, + Some(DataType::LargeUtf8) => { + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) }, - Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( - [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { - x.into_iter().filter_map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }) - }).flatten().collect::>(), - ))), Some(DataType::Dictionary(_, value_type)) => { [<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator) }, @@ -1093,19 +1013,6 @@ where /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn max_page_statistics<'a, I>( - data_type: Option<&DataType>, - iterator: I, -) -> Result -where - I: Iterator, -{ - let mode = env::var("MODE").unwrap_or_default(); - get_data_page_statistics!(Max, data_type, iterator) -} - -/// Extracts the max statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] // pub(crate) fn max_page_statistics<'a, I>( // data_type: Option<&DataType>, // iterator: I, @@ -1113,209 +1020,211 @@ where // where // I: Iterator, // { -// match data_type { -// Some(DataType::Boolean) => Ok(Arc::new(BooleanArray::from_iter( -// MaxBooleanDataPageStatsIterator::new(iterator) -// .flatten() -// .collect::>() -// .into_iter(), -// ))), -// Some(DataType::UInt8) => Ok(Arc::new(UInt8Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| u8::try_from(x).ok())) -// }) -// .flatten(), -// ))), -// Some(DataType::UInt16) => Ok(Arc::new(UInt16Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| u16::try_from(x).ok())) -// }) -// .flatten(), -// ))), -// Some(DataType::UInt32) => Ok(Arc::new(UInt32Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u32)))) -// .flatten(), -// ))), -// // Some(DataType::UInt64) => Ok(Arc::new(UInt64Array::from_iter( -// // MaxInt64DataPageStatsIterator::new(iterator) -// // .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u64)))) -// // .flatten(), -// // ))), -// Some(DataType::UInt64) => { -// let mut builder = UInt64Builder::new(); -// let iterator = MaxInt64DataPageStatsIterator::new(iterator); -// builder.extend(iterator); -// for x in iterator { -// for x in x.into_iter() { -// let Some(x) = x else { -// builder.append_null(); -// continue; -// }; -// builder.append_value(x); -// } -// } -// } -// Some(DataType::Int8) => Ok(Arc::new(Int8Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| i8::try_from(x).ok())) -// }) -// .flatten(), -// ))), -// Some(DataType::Int16) => Ok(Arc::new(Int16Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| i16::try_from(x).ok())) -// }) -// .flatten(), -// ))), -// Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( -// MaxInt64DataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Float16) => Ok(Arc::new(Float16Array::from_iter( -// MaxFloat16DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| Some(from_bytes_to_f16(x.data())))) -// }) -// .flatten(), -// ))), -// Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter( -// MaxFloat32DataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter( -// MaxFloat64DataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter( -// MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter( -// MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Utf8) => { -// Ok(Arc::new(StringArray::from_iter( -// MaxByteArrayDataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter().map(|x| { -// x.and_then(|x|{ -// let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); -// if res.is_none() { -// log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); -// } -// res -// }) -// }) -// }) -// .flatten(), -// ))) -// } -// Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( -// MaxByteArrayDataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter().filter_map(|x| { -// x.and_then(|x|{ -// let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); -// if res.is_none(){ -// log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); -// }res -// }) -// }) -// }) -// .flatten() -// .collect::>(), -// ))), -// Some(DataType::Dictionary(_, value_type)) => { -// max_page_statistics(Some(value_type), iterator) -// } -// Some(DataType::Timestamp(unit, timezone)) => { -// let iter = MaxInt64DataPageStatsIterator::new(iterator).flatten(); -// Ok(match unit { -// TimeUnit::Second => Arc::new( -// TimestampSecondArray::from_iter(iter) -// .with_timezone_opt(timezone.clone()), -// ), -// TimeUnit::Millisecond => Arc::new( -// TimestampMillisecondArray::from_iter(iter) -// .with_timezone_opt(timezone.clone()), -// ), -// TimeUnit::Microsecond => Arc::new( -// TimestampMicrosecondArray::from_iter(iter) -// .with_timezone_opt(timezone.clone()), -// ), -// TimeUnit::Nanosecond => Arc::new( -// TimestampNanosecondArray::from_iter(iter) -// .with_timezone_opt(timezone.clone()), -// ), -// }) -// } -// Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator).flatten(), -// ))), -// Some(DataType::Date64) => Ok(Arc::new(Date64Array::from( -// MaxInt32DataPageStatsIterator::new(iterator) -// .map(|x| { -// x.into_iter() -// .filter_map(|x| x.and_then(|x| i64::try_from(x).ok())) -// .map(|x| x * 24 * 60 * 60 * 1000) -// }) -// .flatten() -// .collect::>(), -// ))), -// Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( -// Decimal128Array::from_iter( -// MaxDecimal128DataPageStatsIterator::new(iterator).flatten(), -// ) -// .with_precision_and_scale(*precision, *scale)?, -// )), -// Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( -// Decimal256Array::from_iter( -// MaxDecimal256DataPageStatsIterator::new(iterator).flatten(), -// ) -// .with_precision_and_scale(*precision, *scale)?, -// )), -// Some(DataType::Time32(unit)) => Ok(match unit { -// TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator).flatten(), -// )), -// TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( -// MaxInt32DataPageStatsIterator::new(iterator).flatten(), -// )), -// _ => new_empty_array(&DataType::Time32(unit.clone())), -// }), -// Some(DataType::Time64(unit)) => Ok(match unit { -// TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( -// MaxInt64DataPageStatsIterator::new(iterator).flatten(), -// )), -// TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( -// MaxInt64DataPageStatsIterator::new(iterator).flatten(), -// )), -// _ => new_empty_array(&DataType::Time64(unit.clone())), -// }), -// Some(DataType::FixedSizeBinary(size)) => Ok(Arc::new( -// FixedSizeBinaryArray::try_from_iter( -// MaxFixedLenByteArrayDataPageStatsIterator::new(iterator) -// .flat_map(|x| x.into_iter()) -// .filter_map(|x| x), -// ) -// .unwrap_or_else(|e| { -// log::debug!("FixedSizeBinary statistics is invalid: {}", e); -// FixedSizeBinaryArray::new(*size, vec![].into(), None) -// }), -// )), -// _ => unimplemented!(), -// } +// let mode = env::var("MODE").unwrap_or_default(); +// get_data_page_statistics!(Max, data_type, iterator) // } +/// Extracts the max statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn max_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + match data_type { + Some(DataType::Boolean) => Ok(Arc::new(BooleanArray::from_iter( + MaxBooleanDataPageStatsIterator::new(iterator) + .flatten() + .collect::>() + .into_iter(), + ))), + Some(DataType::UInt8) => Ok(Arc::new(UInt8Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| u8::try_from(x).ok())) + }) + .flatten(), + ))), + Some(DataType::UInt16) => Ok(Arc::new(UInt16Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| u16::try_from(x).ok())) + }) + .flatten(), + ))), + Some(DataType::UInt32) => Ok(Arc::new(UInt32Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u32)))) + .flatten(), + ))), + Some(DataType::UInt64) => Ok(Arc::new(UInt64Array::from_iter( + MaxInt64DataPageStatsIterator::new(iterator) + .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u64)))) + .flatten(), + ))), + Some(DataType::Int8) => Ok(Arc::new(Int8Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| i8::try_from(x).ok())) + }) + .flatten(), + ))), + Some(DataType::Int16) => Ok(Arc::new(Int16Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| i16::try_from(x).ok())) + }) + .flatten(), + ))), + Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( + MaxInt64DataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Float16) => Ok(Arc::new(Float16Array::from_iter( + MaxFloat16DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| Some(from_bytes_to_f16(x.data())))) + }) + .flatten(), + ))), + Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter( + MaxFloat32DataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter( + MaxFloat64DataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter( + MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter( + MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Utf8) => { + let mut builder = StringBuilder::new(); + let iterator = MaxByteArrayDataPageStatsIterator::new(iterator); + for x in iterator { + for x in x { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + } + Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( + MaxByteArrayDataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x|{ + let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); + if res.is_none(){ + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + }res + }) + }) + }) + .flatten() + .collect::>(), + ))), + Some(DataType::Dictionary(_, value_type)) => { + max_page_statistics(Some(value_type), iterator) + } + Some(DataType::Timestamp(unit, timezone)) => { + let iter = MaxInt64DataPageStatsIterator::new(iterator).flatten(); + Ok(match unit { + TimeUnit::Second => Arc::new( + TimestampSecondArray::from_iter(iter) + .with_timezone_opt(timezone.clone()), + ), + TimeUnit::Millisecond => Arc::new( + TimestampMillisecondArray::from_iter(iter) + .with_timezone_opt(timezone.clone()), + ), + TimeUnit::Microsecond => Arc::new( + TimestampMicrosecondArray::from_iter(iter) + .with_timezone_opt(timezone.clone()), + ), + TimeUnit::Nanosecond => Arc::new( + TimestampNanosecondArray::from_iter(iter) + .with_timezone_opt(timezone.clone()), + ), + }) + } + Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter( + MaxInt32DataPageStatsIterator::new(iterator).flatten(), + ))), + Some(DataType::Date64) => Ok(Arc::new(Date64Array::from( + MaxInt32DataPageStatsIterator::new(iterator) + .map(|x| { + x.into_iter() + .filter_map(|x| x.and_then(|x| i64::try_from(x).ok())) + .map(|x| x * 24 * 60 * 60 * 1000) + }) + .flatten() + .collect::>(), + ))), + Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( + Decimal128Array::from_iter( + MaxDecimal128DataPageStatsIterator::new(iterator).flatten(), + ) + .with_precision_and_scale(*precision, *scale)?, + )), + Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( + Decimal256Array::from_iter( + MaxDecimal256DataPageStatsIterator::new(iterator).flatten(), + ) + .with_precision_and_scale(*precision, *scale)?, + )), + Some(DataType::Time32(unit)) => Ok(match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( + MaxInt32DataPageStatsIterator::new(iterator).flatten(), + )), + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( + MaxInt32DataPageStatsIterator::new(iterator).flatten(), + )), + _ => new_empty_array(&DataType::Time32(unit.clone())), + }), + Some(DataType::Time64(unit)) => Ok(match unit { + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( + MaxInt64DataPageStatsIterator::new(iterator).flatten(), + )), + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( + MaxInt64DataPageStatsIterator::new(iterator).flatten(), + )), + _ => new_empty_array(&DataType::Time64(unit.clone())), + }), + Some(DataType::FixedSizeBinary(size)) => Ok(Arc::new( + FixedSizeBinaryArray::try_from_iter( + MaxFixedLenByteArrayDataPageStatsIterator::new(iterator) + .flat_map(|x| x.into_iter()) + .filter_map(|x| x), + ) + .unwrap_or_else(|e| { + log::debug!("FixedSizeBinary statistics is invalid: {}", e); + FixedSizeBinaryArray::new(*size, vec![].into(), None) + }), + )), + _ => unimplemented!(), + } +} /// Extracts the null count statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] From c09912f5c51fca33a467b60d9dfcc35e100c6577 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 22:25:39 +0800 Subject: [PATCH 06/14] use Builder in row group level stats. --- .../physical_plan/parquet/statistics.rs | 254 +++++++++++++++--- 1 file changed, 211 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8516abe5df98..1d3a2ccf0e5d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,17 +19,17 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::array::StringBuilder; +use arrow::array::{FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder}; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt16Array, UInt32Array, UInt64Array, UInt8Array, + LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; @@ -398,46 +398,67 @@ macro_rules! get_statistics { DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter( [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())), ))), - DataType::Utf8 => Ok(Arc::new(StringArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }), - ))), + DataType::Utf8 => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, DataType::LargeUtf8 => { - Ok(Arc::new(LargeStringArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }), - ))) + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = LargeStringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + } + DataType::FixedSizeBinary(size) => { + let iterator = MaxFixedLenByteArrayStatsIterator::new($iterator); + let mut builder = FixedSizeBinaryBuilder::new(*size); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + if x.len().try_into() != Ok(*size){ + log::debug!( + "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + size, + x.len(), + ); + builder.append_null(); // no statistics value + continue; + } + + let _ = builder.append_value(x); + } + Ok(Arc::new(builder.finish())) } - DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from( - [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - if x.len().try_into() == Ok(*size) { - Some(x) - } else { - log::debug!( - "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", - size, - x.len(), - ); - None - } - }) - }).collect::>(), - ))), DataType::Decimal128(precision, scale) => { let arr = Decimal128Array::from_iter( [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator) @@ -999,6 +1020,153 @@ fn max_statistics<'a, I: Iterator>>( get_statistics!(Max, data_type, iterator) } +// fn max_statistics<'a, I: Iterator>>( +// data_type: &DataType, +// iterator: I, +// ) -> Result { +// match data_type { +// DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ +// x.and_then(|x|i8::try_from(*x).ok()) +// }),))), +// DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ +// x.and_then(|x|i16::try_from(*x).ok()) +// }),))), +// DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ +// x.and_then(|x|u8::try_from(*x).ok()) +// }),))), +// DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ +// x.and_then(|x|u16::try_from(*x).ok()) +// }),))), +// DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))), +// DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))), +// DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{ +// from_bytes_to_f16(x) +// })),))), +// DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), +// DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))), +// DataType::Timestamp(unit,timezone) => { +// let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied()); +// Ok(match unit { +// TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), +// TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), +// TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), +// TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), +// }) +// }, +// DataType::Time32(unit) => { +// Ok(match unit { +// TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), +// TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), +// _ => { +// let len = iterator.count(); +// new_null_array(data_type,len) +// } +// }) +// }, +// DataType::Time64(unit) => { +// Ok(match unit { +// TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), +// TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), +// _ => { +// let len = iterator.count(); +// new_null_array(data_type,len) +// } +// }) +// }, +// DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), +// DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), +// DataType::Utf8 => { +// let iterator = MaxByteArrayStatsIterator::new(iterator); +// let mut builder = StringBuilder::new(); +// for x in iterator { +// let Some(x) = x else { +// builder.append_null(); // no statistics value +// continue; +// }; + +// let Ok(x) = std::str::from_utf8(x) else { +// log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); +// builder.append_null(); +// continue; +// }; + +// builder.append_value(x); +// } +// Ok(Arc::new(builder.finish())) +// }, +// DataType::LargeUtf8 => { +// Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{ +// x.and_then(|x|{ +// let res = std::str::from_utf8(x).map(|s|s.to_string()).ok(); +// if res.is_none() { +// log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); +// } +// res +// }) +// }),))) +// } +// DataType::FixedSizeBinary(size) => { +// let iterator = MaxFixedLenByteArrayStatsIterator::new(iterator); +// let mut builder = FixedSizeBinaryBuilder::new(size); +// for x in iterator { +// let Some(x) = x else { +// builder.append_null(); // no statistics value +// continue; +// }; + +// if x.len().try_into() != Ok(*size){ +// log::debug!( +// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", +// size, +// x.len(), +// ); +// builder.append_null(); // no statistics value +// continue; +// } + +// builder.append_value(x); +// } +// Ok(Arc::new(builder.finish())) +// } + +// // Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{ +// // x.and_then(|x|{ +// // if x.len().try_into()==Ok(*size){ +// // Some(x) +// // }else { +// log::debug!( +// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", +// size, +// x.len(), +// ); +// // None +// // } +// // }) +// // }).collect::>(),))), +// DataType::Decimal128(precision,scale) => { +// let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; +// Ok(Arc::new(arr)) +// }, +// DataType::Decimal256(precision,scale) => { +// let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; +// Ok(Arc::new(arr)) +// }, +// DataType::Dictionary(_,value_type) => { +// max_statistics(value_type,iterator) +// } +// DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => { +// let len = iterator.count(); +// Ok(new_null_array(data_type,len)) +// } +// } +// } + + /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn min_page_statistics<'a, I>( From 4a32df16989f458216ad07dbb52298c863442f5c Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 23:15:34 +0800 Subject: [PATCH 07/14] eliminate some unnecessary tmp `Vec`s. --- .../physical_plan/parquet/statistics.rs | 376 +----------------- 1 file changed, 11 insertions(+), 365 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 1d3a2ccf0e5d..e4bec843ffca 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -26,7 +26,7 @@ use arrow_array::{ new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, @@ -393,10 +393,10 @@ macro_rules! get_statistics { }) }, DataType::Binary => Ok(Arc::new(BinaryArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())), + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator) ))), DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())), + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator) ))), DataType::Utf8 => { let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); @@ -444,7 +444,7 @@ macro_rules! get_statistics { builder.append_null(); // no statistics value continue; }; - + if x.len().try_into() != Ok(*size){ log::debug!( "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", @@ -454,7 +454,7 @@ macro_rules! get_statistics { builder.append_null(); // no statistics value continue; } - + let _ = builder.append_value(x); } Ok(Arc::new(builder.finish())) @@ -768,11 +768,7 @@ macro_rules! get_data_page_statistics { match $data_type { Some(DataType::Boolean) => Ok(Arc::new( BooleanArray::from_iter( - [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator) - .flatten() - // BooleanArray::from_iter required a sized iterator, so collect into Vec first - .collect::>() - .into_iter() + [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator).flatten() ) )), Some(DataType::UInt8) => Ok(Arc::new( @@ -913,14 +909,14 @@ macro_rules! get_data_page_statistics { Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Date64) => Ok( Arc::new( - Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { x.into_iter() - .filter_map(|x| { + .map(|x| { x.and_then(|x| i64::try_from(x).ok()) }) - .map(|x| x * 24 * 60 * 60 * 1000) - }).flatten().collect::>() + .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) + }).flatten() ) ) ), @@ -1020,153 +1016,6 @@ fn max_statistics<'a, I: Iterator>>( get_statistics!(Max, data_type, iterator) } -// fn max_statistics<'a, I: Iterator>>( -// data_type: &DataType, -// iterator: I, -// ) -> Result { -// match data_type { -// DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ -// x.and_then(|x|i8::try_from(*x).ok()) -// }),))), -// DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ -// x.and_then(|x|i16::try_from(*x).ok()) -// }),))), -// DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ -// x.and_then(|x|u8::try_from(*x).ok()) -// }),))), -// DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ -// x.and_then(|x|u16::try_from(*x).ok()) -// }),))), -// DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))), -// DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))), -// DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{ -// from_bytes_to_f16(x) -// })),))), -// DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), -// DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))), -// DataType::Timestamp(unit,timezone) => { -// let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied()); -// Ok(match unit { -// TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), -// TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), -// TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), -// TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), -// }) -// }, -// DataType::Time32(unit) => { -// Ok(match unit { -// TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), -// TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), -// _ => { -// let len = iterator.count(); -// new_null_array(data_type,len) -// } -// }) -// }, -// DataType::Time64(unit) => { -// Ok(match unit { -// TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), -// TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), -// _ => { -// let len = iterator.count(); -// new_null_array(data_type,len) -// } -// }) -// }, -// DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), -// DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), -// DataType::Utf8 => { -// let iterator = MaxByteArrayStatsIterator::new(iterator); -// let mut builder = StringBuilder::new(); -// for x in iterator { -// let Some(x) = x else { -// builder.append_null(); // no statistics value -// continue; -// }; - -// let Ok(x) = std::str::from_utf8(x) else { -// log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); -// builder.append_null(); -// continue; -// }; - -// builder.append_value(x); -// } -// Ok(Arc::new(builder.finish())) -// }, -// DataType::LargeUtf8 => { -// Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{ -// x.and_then(|x|{ -// let res = std::str::from_utf8(x).map(|s|s.to_string()).ok(); -// if res.is_none() { -// log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); -// } -// res -// }) -// }),))) -// } -// DataType::FixedSizeBinary(size) => { -// let iterator = MaxFixedLenByteArrayStatsIterator::new(iterator); -// let mut builder = FixedSizeBinaryBuilder::new(size); -// for x in iterator { -// let Some(x) = x else { -// builder.append_null(); // no statistics value -// continue; -// }; - -// if x.len().try_into() != Ok(*size){ -// log::debug!( -// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", -// size, -// x.len(), -// ); -// builder.append_null(); // no statistics value -// continue; -// } - -// builder.append_value(x); -// } -// Ok(Arc::new(builder.finish())) -// } - -// // Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{ -// // x.and_then(|x|{ -// // if x.len().try_into()==Ok(*size){ -// // Some(x) -// // }else { -// log::debug!( -// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", -// size, -// x.len(), -// ); -// // None -// // } -// // }) -// // }).collect::>(),))), -// DataType::Decimal128(precision,scale) => { -// let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; -// Ok(Arc::new(arr)) -// }, -// DataType::Decimal256(precision,scale) => { -// let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; -// Ok(Arc::new(arr)) -// }, -// DataType::Dictionary(_,value_type) => { -// max_statistics(value_type,iterator) -// } -// DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => { -// let len = iterator.count(); -// Ok(new_null_array(data_type,len)) -// } -// } -// } - - /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn min_page_statistics<'a, I>( @@ -1181,19 +1030,6 @@ where /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -// pub(crate) fn max_page_statistics<'a, I>( -// data_type: Option<&DataType>, -// iterator: I, -// ) -> Result -// where -// I: Iterator, -// { -// let mode = env::var("MODE").unwrap_or_default(); -// get_data_page_statistics!(Max, data_type, iterator) -// } - -/// Extracts the max statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn max_page_statistics<'a, I>( data_type: Option<&DataType>, iterator: I, @@ -1201,197 +1037,7 @@ pub(crate) fn max_page_statistics<'a, I>( where I: Iterator, { - match data_type { - Some(DataType::Boolean) => Ok(Arc::new(BooleanArray::from_iter( - MaxBooleanDataPageStatsIterator::new(iterator) - .flatten() - .collect::>() - .into_iter(), - ))), - Some(DataType::UInt8) => Ok(Arc::new(UInt8Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| u8::try_from(x).ok())) - }) - .flatten(), - ))), - Some(DataType::UInt16) => Ok(Arc::new(UInt16Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| u16::try_from(x).ok())) - }) - .flatten(), - ))), - Some(DataType::UInt32) => Ok(Arc::new(UInt32Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u32)))) - .flatten(), - ))), - Some(DataType::UInt64) => Ok(Arc::new(UInt64Array::from_iter( - MaxInt64DataPageStatsIterator::new(iterator) - .map(|x| x.into_iter().filter_map(|x| x.and_then(|x| Some(x as u64)))) - .flatten(), - ))), - Some(DataType::Int8) => Ok(Arc::new(Int8Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| i8::try_from(x).ok())) - }) - .flatten(), - ))), - Some(DataType::Int16) => Ok(Arc::new(Int16Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| i16::try_from(x).ok())) - }) - .flatten(), - ))), - Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( - MaxInt64DataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Float16) => Ok(Arc::new(Float16Array::from_iter( - MaxFloat16DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| Some(from_bytes_to_f16(x.data())))) - }) - .flatten(), - ))), - Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter( - MaxFloat32DataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter( - MaxFloat64DataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter( - MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter( - MaxByteArrayDataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Utf8) => { - let mut builder = StringBuilder::new(); - let iterator = MaxByteArrayDataPageStatsIterator::new(iterator); - for x in iterator { - for x in x { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); - builder.append_null(); - continue; - }; - - builder.append_value(x); - } - } - Ok(Arc::new(builder.finish())) - } - Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( - MaxByteArrayDataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter().filter_map(|x| { - x.and_then(|x|{ - let res = std::str::from_utf8(x.data()).map(|s|s.to_string()).ok(); - if res.is_none(){ - log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); - }res - }) - }) - }) - .flatten() - .collect::>(), - ))), - Some(DataType::Dictionary(_, value_type)) => { - max_page_statistics(Some(value_type), iterator) - } - Some(DataType::Timestamp(unit, timezone)) => { - let iter = MaxInt64DataPageStatsIterator::new(iterator).flatten(); - Ok(match unit { - TimeUnit::Second => Arc::new( - TimestampSecondArray::from_iter(iter) - .with_timezone_opt(timezone.clone()), - ), - TimeUnit::Millisecond => Arc::new( - TimestampMillisecondArray::from_iter(iter) - .with_timezone_opt(timezone.clone()), - ), - TimeUnit::Microsecond => Arc::new( - TimestampMicrosecondArray::from_iter(iter) - .with_timezone_opt(timezone.clone()), - ), - TimeUnit::Nanosecond => Arc::new( - TimestampNanosecondArray::from_iter(iter) - .with_timezone_opt(timezone.clone()), - ), - }) - } - Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter( - MaxInt32DataPageStatsIterator::new(iterator).flatten(), - ))), - Some(DataType::Date64) => Ok(Arc::new(Date64Array::from( - MaxInt32DataPageStatsIterator::new(iterator) - .map(|x| { - x.into_iter() - .filter_map(|x| x.and_then(|x| i64::try_from(x).ok())) - .map(|x| x * 24 * 60 * 60 * 1000) - }) - .flatten() - .collect::>(), - ))), - Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( - Decimal128Array::from_iter( - MaxDecimal128DataPageStatsIterator::new(iterator).flatten(), - ) - .with_precision_and_scale(*precision, *scale)?, - )), - Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( - Decimal256Array::from_iter( - MaxDecimal256DataPageStatsIterator::new(iterator).flatten(), - ) - .with_precision_and_scale(*precision, *scale)?, - )), - Some(DataType::Time32(unit)) => Ok(match unit { - TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( - MaxInt32DataPageStatsIterator::new(iterator).flatten(), - )), - TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( - MaxInt32DataPageStatsIterator::new(iterator).flatten(), - )), - _ => new_empty_array(&DataType::Time32(unit.clone())), - }), - Some(DataType::Time64(unit)) => Ok(match unit { - TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( - MaxInt64DataPageStatsIterator::new(iterator).flatten(), - )), - TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( - MaxInt64DataPageStatsIterator::new(iterator).flatten(), - )), - _ => new_empty_array(&DataType::Time64(unit.clone())), - }), - Some(DataType::FixedSizeBinary(size)) => Ok(Arc::new( - FixedSizeBinaryArray::try_from_iter( - MaxFixedLenByteArrayDataPageStatsIterator::new(iterator) - .flat_map(|x| x.into_iter()) - .filter_map(|x| x), - ) - .unwrap_or_else(|e| { - log::debug!("FixedSizeBinary statistics is invalid: {}", e); - FixedSizeBinaryArray::new(*size, vec![].into(), None) - }), - )), - _ => unimplemented!(), - } + get_data_page_statistics!(Max, data_type, iterator) } /// Extracts the null count statistics from an iterator From 4f419cacdbcf61daa8b20a5b18c2f92ef7c2098c Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 23:30:26 +0800 Subject: [PATCH 08/14] remove the quick modify in bench. --- datafusion/core/benches/parquet_statistic.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index 08823686dfc8..3595e8773b07 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -198,9 +198,7 @@ fn make_dict_batch() -> RecordBatch { fn criterion_benchmark(c: &mut Criterion) { let row_groups = 100; use TestTypes::*; - // let types = vec![Int64, UInt64, F64, String, Dictionary]; - // let types = vec![String]; - let types = vec![String]; + let types = vec![Int64, UInt64, F64, String, Dictionary]; let data_page_row_count_limits = vec![None, Some(1)]; for dtype in types { @@ -217,8 +215,7 @@ fn criterion_benchmark(c: &mut Criterion) { let statistic_type = if data_page_row_count_limit.is_some() { "data page" } else { - // "row group" - continue; + "row group" }; let mut group = c.benchmark_group(format!( From a0eb9cd4cba825be7642b2d1c029b56a10619e31 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 23:44:27 +0800 Subject: [PATCH 09/14] process the result return from append_value of FixedSizeBinaryBuilder. --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index e4bec843ffca..94e9e83e61d2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -445,17 +445,18 @@ macro_rules! get_statistics { continue; }; + // ignore invalid values if x.len().try_into() != Ok(*size){ log::debug!( "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", size, x.len(), ); - builder.append_null(); // no statistics value + builder.append_null(); continue; } - let _ = builder.append_value(x); + builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); } Ok(Arc::new(builder.finish())) } From c293be5ec47e0a7bb0b15cc1c59960e7a9b09f5c Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 8 Jul 2024 01:14:25 +0800 Subject: [PATCH 10/14] remove the modification of FixedSizeBinary&Bool, and fix the String case. --- .../physical_plan/parquet/statistics.rs | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 94e9e83e61d2..d0b63d88c4d1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,7 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::array::{FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder}; +use arrow::array::{LargeStringBuilder, StringBuilder}; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ @@ -436,30 +436,46 @@ macro_rules! get_statistics { } Ok(Arc::new(builder.finish())) } - DataType::FixedSizeBinary(size) => { - let iterator = MaxFixedLenByteArrayStatsIterator::new($iterator); - let mut builder = FixedSizeBinaryBuilder::new(*size); - for x in iterator { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - // ignore invalid values - if x.len().try_into() != Ok(*size){ - log::debug!( - "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", - size, - x.len(), - ); - builder.append_null(); - continue; - } - - builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); - } - Ok(Arc::new(builder.finish())) - } + // DataType::FixedSizeBinary(size) => { + // let iterator = MaxFixedLenByteArrayStatsIterator::new($iterator); + // let mut builder = FixedSizeBinaryBuilder::new(*size); + // for x in iterator { + // let Some(x) = x else { + // builder.append_null(); // no statistics value + // continue; + // }; + + // // ignore invalid values + // if x.len().try_into() != Ok(*size){ + // log::debug!( + // "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + // size, + // x.len(), + // ); + // builder.append_null(); + // continue; + // } + + // builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); + // } + // Ok(Arc::new(builder.finish())) + // } + DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from( + [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if x.len().try_into() == Ok(*size) { + Some(x) + } else { + log::debug!( + "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + size, + x.len(), + ); + None + } + }) + }).collect::>(), + ))), DataType::Decimal128(precision, scale) => { let arr = Decimal128Array::from_iter( [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator) @@ -769,7 +785,11 @@ macro_rules! get_data_page_statistics { match $data_type { Some(DataType::Boolean) => Ok(Arc::new( BooleanArray::from_iter( - [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator).flatten() + [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator) + .flatten() + // BooleanArray::from_iter required a sized iterator, so collect into Vec first + .collect::>() + .into_iter() ) )), Some(DataType::UInt8) => Ok(Arc::new( @@ -875,7 +895,7 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(builder.finish())) }, Some(DataType::LargeUtf8) => { - let mut builder = StringBuilder::new(); + let mut builder = LargeStringBuilder::new(); let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); for x in iterator { for x in x.into_iter() { From 441d6abee7dde59d94ae6af40d2236b7e0908197 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 8 Jul 2024 01:18:43 +0800 Subject: [PATCH 11/14] fix and re-enable the modification of FixedSizeBinary. --- .../physical_plan/parquet/statistics.rs | 68 +++++++------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index d0b63d88c4d1..943c5beb6ec6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,7 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::array::{LargeStringBuilder, StringBuilder}; +use arrow::array::{FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder}; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ @@ -435,47 +435,31 @@ macro_rules! get_statistics { builder.append_value(x); } Ok(Arc::new(builder.finish())) - } - // DataType::FixedSizeBinary(size) => { - // let iterator = MaxFixedLenByteArrayStatsIterator::new($iterator); - // let mut builder = FixedSizeBinaryBuilder::new(*size); - // for x in iterator { - // let Some(x) = x else { - // builder.append_null(); // no statistics value - // continue; - // }; - - // // ignore invalid values - // if x.len().try_into() != Ok(*size){ - // log::debug!( - // "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", - // size, - // x.len(), - // ); - // builder.append_null(); - // continue; - // } - - // builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); - // } - // Ok(Arc::new(builder.finish())) - // } - DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from( - [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - if x.len().try_into() == Ok(*size) { - Some(x) - } else { - log::debug!( - "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", - size, - x.len(), - ); - None - } - }) - }).collect::>(), - ))), + }, + DataType::FixedSizeBinary(size) => { + let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator); + let mut builder = FixedSizeBinaryBuilder::new(*size); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + // ignore invalid values + if x.len().try_into() != Ok(*size){ + log::debug!( + "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + size, + x.len(), + ); + builder.append_null(); + continue; + } + + builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); + } + Ok(Arc::new(builder.finish())) + }, DataType::Decimal128(precision, scale) => { let arr = Decimal128Array::from_iter( [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator) From a6d28d9e8a3a27ad72ba3c7b24b32c22a5081cd3 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 8 Jul 2024 01:20:21 +0800 Subject: [PATCH 12/14] fix comments. --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 943c5beb6ec6..8d9954bd1443 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -1033,7 +1033,7 @@ where get_data_page_statistics!(Min, data_type, iterator) } -/// Extracts the min statistics from an iterator +/// Extracts the max statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn max_page_statistics<'a, I>( data_type: Option<&DataType>, From 2e4c87bbf6479413875e8e0cbe89b15432e6fcbd Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 8 Jul 2024 10:50:54 +0800 Subject: [PATCH 13/14] use BooleanBuilder to eliminate the collect in BooleanArray case. --- .../physical_plan/parquet/statistics.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 886128fed3b2..5903a7a473c3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -24,8 +24,8 @@ use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, - Decimal128Array, Decimal256Array, Float16Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, + Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, @@ -762,15 +762,20 @@ macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { paste! { match $data_type { - Some(DataType::Boolean) => Ok(Arc::new( - BooleanArray::from_iter( - [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator) - .flatten() - // BooleanArray::from_iter required a sized iterator, so collect into Vec first - .collect::>() - .into_iter() - ) - )), + Some(DataType::Boolean) => { + let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator); + let mut builder = BooleanBuilder::new(); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, Some(DataType::UInt8) => Ok(Arc::new( UInt8Array::from_iter( [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) From 651656a26e00dbda5502c5ebd55bcf5dc1b869f7 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 8 Jul 2024 10:54:25 +0800 Subject: [PATCH 14/14] fix compile. --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 5903a7a473c3..59369aba57a9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,7 +19,9 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::array::{FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder}; +use arrow::array::{ + BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, +}; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{