From 9f93e9b969e392a6d974f79bf2bdba21f085c3da Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 7 Jul 2024 22:25:39 +0800 Subject: [PATCH] use Builder in row group level stats. --- .../physical_plan/parquet/statistics.rs | 205 +++++++++++++++--- 1 file changed, 179 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8516abe5df98..b2d7c413ad8d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -26,10 +26,10 @@ use arrow_array::{ new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt16Array, UInt32Array, UInt64Array, UInt8Array, + LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; @@ -398,29 +398,61 @@ macro_rules! get_statistics { DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter( [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())), ))), - DataType::Utf8 => Ok(Arc::new(StringArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + DataType::Utf8 => { + let mode = std::env::var("MODE").unwrap_or_default(); + match mode.as_str() { + "use_builder" => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); } - res - }) - }), - ))), + Ok(Arc::new(builder.finish())) + } + _ => { + Ok(Arc::new(LargeStringArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }), + ))) + } + } + }, DataType::LargeUtf8 => { - Ok(Arc::new(LargeStringArray::from_iter( - [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { - x.and_then(|x| { - let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); - if res.is_none() { - log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); - } - res - }) - }), - ))) + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) } DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from( [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| { @@ -992,11 +1024,132 @@ fn min_statistics<'a, I: Iterator>>( /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] /// /// This is an internal helper -- see [`StatisticsConverter`] for public API +// fn max_statistics<'a, I: Iterator>>( +// data_type: &DataType, +// iterator: I, +// ) -> Result { +// get_statistics!(Max, data_type, iterator) +// } fn max_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, ) -> Result { - get_statistics!(Max, data_type, iterator) + match data_type { + DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ + x.and_then(|x|i8::try_from(*x).ok()) + }),))), + DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ + x.and_then(|x|i16::try_from(*x).ok()) + }),))), + DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ + x.and_then(|x|u8::try_from(*x).ok()) + }),))), + DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{ + x.and_then(|x|u16::try_from(*x).ok()) + }),))), + DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))), + DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))), + DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{ + from_bytes_to_f16(x) + })),))), + DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))), + DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))), + DataType::Timestamp(unit,timezone) => { + let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied()); + Ok(match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + }) + }, + DataType::Time32(unit) => { + Ok(match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)), + _ => { + let len = iterator.count(); + new_null_array(data_type,len) + } + }) + }, + DataType::Time64(unit) => { + Ok(match unit { + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)), + _ => { + let len = iterator.count(); + new_null_array(data_type,len) + } + }) + }, + DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), + DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))), + DataType::Utf8 => { + let iterator = MaxByteArrayStatsIterator::new(iterator); + let mut builder = StringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, + DataType::LargeUtf8 => { + Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{ + x.and_then(|x|{ + let res = std::str::from_utf8(x).map(|s|s.to_string()).ok(); + if res.is_none() { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }),))) + } + DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{ + x.and_then(|x|{ + if x.len().try_into()==Ok(*size){ + Some(x) + }else { + log::debug!( + "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + size, + x.len(), + ); + None + } + }) + }).collect::>(),))), + DataType::Decimal128(precision,scale) => { + let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Decimal256(precision,scale) => { + let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Dictionary(_,value_type) => { + max_statistics(value_type,iterator) + } + DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => { + let len = iterator.count(); + Ok(new_null_array(data_type,len)) + } + } } /// Extracts the min statistics from an iterator