Skip to content

Commit

Permalink
use Builder in row group level stats.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Jul 7, 2024
1 parent 88e3957 commit 9f93e9b
Showing 1 changed file with 179 additions and 26 deletions.
205 changes: 179 additions & 26 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use arrow_array::{
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
Expand Down Expand Up @@ -398,29 +398,61 @@ macro_rules! get_statistics {
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
))),
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
DataType::Utf8 => {
let mode = std::env::var("MODE").unwrap_or_default();
match mode.as_str() {
"use_builder" => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
res
})
}),
))),
Ok(Arc::new(builder.finish()))
}
_ => {
Ok(Arc::new(LargeStringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
)))
}
}
},
DataType::LargeUtf8 => {
Ok(Arc::new(LargeStringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
)))
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
}
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
Expand Down Expand Up @@ -992,11 +1024,132 @@ fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
///
/// This is an internal helper -- see [`StatisticsConverter`] for public API
// fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
// data_type: &DataType,
// iterator: I,
// ) -> Result<ArrayRef> {
// get_statistics!(Max, data_type, iterator)
// }
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
get_statistics!(Max, data_type, iterator)
match data_type {
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
x.and_then(|x|i8::try_from(*x).ok())
}),))),
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
x.and_then(|x|i16::try_from(*x).ok())
}),))),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
x.and_then(|x|u8::try_from(*x).ok())
}),))),
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
x.and_then(|x|u16::try_from(*x).ok())
}),))),
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))),
DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))),
DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{
from_bytes_to_f16(x)
})),))),
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))),
DataType::Timestamp(unit,timezone) => {
let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied());
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
DataType::Time32(unit) => {
Ok(match unit {
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
_ => {
let len = iterator.count();
new_null_array(data_type,len)
}
})
},
DataType::Time64(unit) => {
Ok(match unit {
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
_ => {
let len = iterator.count();
new_null_array(data_type,len)
}
})
},
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
DataType::Utf8 => {
let iterator = MaxByteArrayStatsIterator::new(iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{
x.and_then(|x|{
let res = std::str::from_utf8(x).map(|s|s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),)))
}
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{
x.and_then(|x|{
if x.len().try_into()==Ok(*size){
Some(x)
}else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
None
}
})
}).collect::<Vec<_>>(),))),
DataType::Decimal128(precision,scale) => {
let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal256(precision,scale) => {
let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Dictionary(_,value_type) => {
max_statistics(value_type,iterator)
}
DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => {
let len = iterator.count();
Ok(new_null_array(data_type,len))
}
}
}

/// Extracts the min statistics from an iterator
Expand Down

0 comments on commit 9f93e9b

Please sign in to comment.