From 801414d3d58e152f90016666d4f02de9ef5d0460 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 12:04:29 +0200 Subject: [PATCH 1/4] perf: Fast decision for Parquet dictionary encoding This PR adds two things: 1. HyperLogLog to determine estimate the cardinality of an array. If the estimated cardinality is too high, no group-by has to be done. This speeds up Parquet writing by ~2x for high cardinality data. 2. An extension of the fast path for integers where if the min and the max are close enough, a bitmask is created to determine the cardinality. If cardinality is low enough or the cardinality is too high, the HyperLogLog path can be skipped. This can also lead to more than 2x improvements while writing. --- crates/polars-compute/src/cardinality.rs | 159 ++++++++++++++ crates/polars-compute/src/lib.rs | 1 + .../src/arrow/write/dictionary.rs | 206 +++++++++++++----- .../src/parquet/read/page/reader.rs | 20 +- 4 files changed, 323 insertions(+), 63 deletions(-) create mode 100644 crates/polars-compute/src/cardinality.rs diff --git a/crates/polars-compute/src/cardinality.rs b/crates/polars-compute/src/cardinality.rs new file mode 100644 index 000000000000..fa613382661a --- /dev/null +++ b/crates/polars-compute/src/cardinality.rs @@ -0,0 +1,159 @@ +use arrow::array::{ + Array, BinaryArray, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, PrimitiveArray, + Utf8Array, Utf8ViewArray, +}; +use arrow::datatypes::PhysicalType; +use arrow::types::Offset; +use arrow::with_match_primitive_type; +use polars_utils::total_ord::ToTotalOrd; + +use crate::hyperloglogplus::HyperLogLog; + +/// Get an estimate for the *cardinality* of the array (i.e. the number of unique values) +/// +/// This is not currently implemented for nested types. +pub fn estimate_cardinality(array: &dyn Array) -> usize { + if array.is_empty() { + return 0; + } + + if array.null_count() == array.len() { + return 1; + } + + // Estimate the cardinality with HyperLogLog + use PhysicalType as PT; + match array.dtype().to_physical_type() { + PT::Null => 1, + + PT::Boolean => { + let mut cardinality = 0; + + let array = array.as_any().downcast_ref::().unwrap(); + + cardinality += usize::from(array.has_nulls()); + + if let Some(unset_bits) = array.values().lazy_unset_bits() { + cardinality += 1 + usize::from(unset_bits != array.len()); + } else { + cardinality += 2; + } + + cardinality + }, + + PT::Primitive(primitive_type) => with_match_primitive_type!(primitive_type, |$T| { + let mut hll = HyperLogLog::new(); + + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.copied().unwrap_or_default(); + hll.add(&v.to_total_ord()); + } + } else { + for v in array.values_iter() { + hll.add(&v.to_total_ord()); + } + } + + hll.count() + }), + PT::FixedSizeBinary => { + let mut hll = HyperLogLog::new(); + + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() + }, + PT::Binary => { + binary_offset_array_estimate(array.as_any().downcast_ref::>().unwrap()) + }, + PT::LargeBinary => { + binary_offset_array_estimate(array.as_any().downcast_ref::>().unwrap()) + }, + PT::Utf8 => binary_offset_array_estimate( + &array + .as_any() + .downcast_ref::>() + .unwrap() + .to_binary(), + ), + PT::LargeUtf8 => binary_offset_array_estimate( + &array + .as_any() + .downcast_ref::>() + .unwrap() + .to_binary(), + ), + PT::BinaryView => { + binary_view_array_estimate(array.as_any().downcast_ref::().unwrap()) + }, + PT::Utf8View => binary_view_array_estimate( + &array + .as_any() + .downcast_ref::() + .unwrap() + .to_binview(), + ), + PT::List => unimplemented!(), + PT::FixedSizeList => unimplemented!(), + PT::LargeList => unimplemented!(), + PT::Struct => unimplemented!(), + PT::Union => unimplemented!(), + PT::Map => unimplemented!(), + PT::Dictionary(_) => unimplemented!(), + } +} + +fn binary_offset_array_estimate(array: &BinaryArray) -> usize { + let mut hll = HyperLogLog::new(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() +} + +fn binary_view_array_estimate(array: &BinaryViewArray) -> usize { + let mut hll = HyperLogLog::new(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() +} diff --git a/crates/polars-compute/src/lib.rs b/crates/polars-compute/src/lib.rs index da56c65983db..49774ef288c1 100644 --- a/crates/polars-compute/src/lib.rs +++ b/crates/polars-compute/src/lib.rs @@ -10,6 +10,7 @@ use arrow::types::NativeType; pub mod arithmetic; pub mod arity; pub mod bitwise; +pub mod cardinality; pub mod comparisons; pub mod filter; pub mod float_sum; diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 4e0d57302314..4a297c06e5fc 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -3,11 +3,12 @@ use arrow::array::{ }; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::buffer::Buffer; -use arrow::datatypes::{ArrowDataType, IntegerType}; +use arrow::datatypes::{ArrowDataType, IntegerType, PhysicalType}; +use arrow::legacy::utils::CustomIterTools; +use arrow::trusted_len::TrustMyLength; use arrow::types::NativeType; use polars_compute::min_max::MinMaxKernel; use polars_error::{polars_bail, PolarsResult}; -use polars_utils::unwrap::UnwrapUncheckedRelease; use super::binary::{ build_statistics as binary_build_statistics, encode_plain as binary_encode_plain, @@ -31,33 +32,51 @@ use crate::parquet::CowBuffer; use crate::write::DynIter; trait MinMaxThreshold { - const DELTA_THRESHOLD: Self; + const DELTA_THRESHOLD: usize; + const BITMASK_THRESHOLD: usize; + + fn from_start_and_offset(start: Self, offset: usize) -> Self; } macro_rules! minmaxthreshold_impls { - ($($t:ty => $threshold:literal,)+) => { + ($($signed:ty, $unsigned:ty => $threshold:literal, $bm_threshold:expr,)+) => { $( - impl MinMaxThreshold for $t { - const DELTA_THRESHOLD: Self = $threshold; + impl MinMaxThreshold for $signed { + const DELTA_THRESHOLD: usize = $threshold; + const BITMASK_THRESHOLD: usize = $bm_threshold; + + fn from_start_and_offset(start: Self, offset: usize) -> Self { + start + ((offset as $unsigned) as $signed) + } + } + impl MinMaxThreshold for $unsigned { + const DELTA_THRESHOLD: usize = $threshold; + const BITMASK_THRESHOLD: usize = $bm_threshold; + + fn from_start_and_offset(start: Self, offset: usize) -> Self { + start + (offset as $unsigned) + } } )+ }; } minmaxthreshold_impls! { - i8 => 16, - i16 => 256, - i32 => 512, - i64 => 2048, - u8 => 16, - u16 => 256, - u32 => 512, - u64 => 2048, + i8, u8 => 16, u8::MAX as usize, + i16, u16 => 256, u16::MAX as usize, + i32, u32 => 512, u16::MAX as usize, + i64, u64 => 2048, u16::MAX as u64 as usize, +} + +enum DictionaryDecision { + NotWorth, + TryAgain, + Found(DictionaryArray), } fn min_max_integer_encode_as_dictionary_optional<'a, E, T>( array: &'a dyn Array, -) -> Option> +) -> DictionaryDecision where E: std::fmt::Debug, T: NativeType @@ -65,26 +84,80 @@ where + std::cmp::Ord + TryInto + std::ops::Sub - + num_traits::CheckedSub, + + num_traits::CheckedSub + + num_traits::cast::AsPrimitive, std::ops::RangeInclusive: Iterator, PrimitiveArray: MinMaxKernel = T>, { - use ArrowDataType as DT; - let (min, max): (T, T) = as MinMaxKernel>::min_max_ignore_nan_kernel( + let min_max = as MinMaxKernel>::min_max_ignore_nan_kernel( array.as_any().downcast_ref().unwrap(), - )?; + ); + + let Some((min, max)) = min_max else { + return DictionaryDecision::TryAgain; + }; debug_assert!(max >= min, "{max} >= {min}"); - if !max - .checked_sub(&min) - .is_some_and(|v| v <= T::DELTA_THRESHOLD) - { - return None; + let diff = max.checked_sub(&min).unwrap(); + + let diff = diff.as_(); + + if diff > T::BITMASK_THRESHOLD { + return DictionaryDecision::TryAgain; + } + + let mut seen_mask = MutableBitmap::from_len_zeroed(diff + 1); + + let array = array.as_any().downcast_ref::>().unwrap(); + + if array.has_nulls() { + for v in array.non_null_values_iter() { + let offset = (v - min).as_(); + debug_assert!(offset <= diff); + + unsafe { + seen_mask.set_unchecked(offset, true); + } + } + } else { + for v in array.values_iter() { + let offset = (*v - min).as_(); + debug_assert!(offset <= diff); + + unsafe { + seen_mask.set_unchecked(offset, true); + } + } + } + + let cardinality = seen_mask.set_bits(); + + let mut is_worth_it = false; + + is_worth_it |= cardinality <= T::DELTA_THRESHOLD; + is_worth_it |= (cardinality as f64) / (array.len() as f64) < 0.75; + + if !is_worth_it { + return DictionaryDecision::NotWorth; + } + + let seen_mask = seen_mask.freeze(); + + // SAFETY: We just did the calculation for this. + let indexes = seen_mask + .true_idx_iter() + .map(|idx| T::from_start_and_offset(min, idx)); + let indexes = unsafe { TrustMyLength::new(indexes, cardinality) }; + let indexes = indexes.collect_trusted::>(); + + let mut lookup = vec![0u16; diff + 1]; + + for (i, &idx) in indexes.iter().enumerate() { + lookup[idx.as_()] = i as u16; } - // @TODO: This currently overestimates the values, it might be interesting to use the unique - // kernel here. - let values = PrimitiveArray::new(DT::from(T::PRIMITIVE), (min..=max).collect(), None); + use ArrowDataType as DT; + let values = PrimitiveArray::new(DT::from(T::PRIMITIVE), indexes.into(), None); let values = Box::new(values); let keys: Buffer = array @@ -93,20 +166,19 @@ where .unwrap() .values() .iter() - .map(|v| unsafe { + .map(|v| { // @NOTE: // Since the values might contain nulls which have a undefined value. We just // clamp the values to between the min and max value. This way, they will still - // be valid dictionary keys. This is mostly to make the - // unwrap_unchecked_release not produce any unsafety. - (*v.clamp(&min, &max) - min) - .try_into() - .unwrap_unchecked_release() + // be valid dictionary keys. + let idx = *v.clamp(&min, &max) - min; + let value = unsafe { lookup.get_unchecked(idx.as_()) }; + (*value).into() }) .collect(); let keys = PrimitiveArray::new(DT::UInt32, keys, array.validity().cloned()); - Some( + DictionaryDecision::Found( DictionaryArray::::try_new( ArrowDataType::Dictionary( IntegerType::UInt32, @@ -126,26 +198,15 @@ pub(crate) fn encode_as_dictionary_optional( type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { - use ArrowDataType as DT; - let fast_dictionary = match array.dtype() { - DT::Int8 => min_max_integer_encode_as_dictionary_optional::<_, i8>(array), - DT::Int16 => min_max_integer_encode_as_dictionary_optional::<_, i16>(array), - DT::Int32 | DT::Date32 | DT::Time32(_) => { - min_max_integer_encode_as_dictionary_optional::<_, i32>(array) - }, - DT::Int64 | DT::Date64 | DT::Time64(_) | DT::Timestamp(_, _) | DT::Duration(_) => { - min_max_integer_encode_as_dictionary_optional::<_, i64>(array) - }, - DT::UInt8 => min_max_integer_encode_as_dictionary_optional::<_, u8>(array), - DT::UInt16 => min_max_integer_encode_as_dictionary_optional::<_, u16>(array), - DT::UInt32 => min_max_integer_encode_as_dictionary_optional::<_, u32>(array), - DT::UInt64 => min_max_integer_encode_as_dictionary_optional::<_, u64>(array), - _ => None, - }; + if array.is_empty() { + let array = DictionaryArray::::new_empty(ArrowDataType::Dictionary( + IntegerType::UInt32, + Box::new(array.dtype().clone()), + false, // @TODO: This might be able to be set to true? + )); - if let Some(fast_dictionary) = fast_dictionary { return Some(array_to_pages( - &fast_dictionary, + &array, type_, nested, options, @@ -153,9 +214,44 @@ pub(crate) fn encode_as_dictionary_optional( )); } + use arrow::types::PrimitiveType as PT; + let fast_dictionary = match array.dtype().to_physical_type() { + PhysicalType::Primitive(pt) => match pt { + PT::Int8 => min_max_integer_encode_as_dictionary_optional::<_, i8>(array), + PT::Int16 => min_max_integer_encode_as_dictionary_optional::<_, i16>(array), + PT::Int32 => min_max_integer_encode_as_dictionary_optional::<_, i32>(array), + PT::Int64 => min_max_integer_encode_as_dictionary_optional::<_, i64>(array), + PT::UInt8 => min_max_integer_encode_as_dictionary_optional::<_, u8>(array), + PT::UInt16 => min_max_integer_encode_as_dictionary_optional::<_, u16>(array), + PT::UInt32 => min_max_integer_encode_as_dictionary_optional::<_, u32>(array), + PT::UInt64 => min_max_integer_encode_as_dictionary_optional::<_, u64>(array), + _ => DictionaryDecision::TryAgain, + }, + _ => DictionaryDecision::TryAgain, + }; + + match fast_dictionary { + DictionaryDecision::NotWorth => return None, + DictionaryDecision::Found(dictionary_array) => { + return Some(array_to_pages( + &dictionary_array, + type_, + nested, + options, + Encoding::RleDictionary, + )) + }, + DictionaryDecision::TryAgain => {}, + } + let dtype = Box::new(array.dtype().clone()); - let len_before = array.len(); + let estimated_cardinality = polars_compute::cardinality::estimate_cardinality(array); + + if array.len() > 128 && (estimated_cardinality as f64) / (array.len() as f64) > 0.75 { + return None; + } + // This does the group by. let array = arrow::compute::cast::cast( array, @@ -169,10 +265,6 @@ pub(crate) fn encode_as_dictionary_optional( .downcast_ref::>() .unwrap(); - if (array.values().len() as f64) / (len_before as f64) > 0.75 { - return None; - } - Some(array_to_pages( array, type_, diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index cd23af0499d7..ad453a0ff50a 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -13,6 +13,7 @@ use crate::parquet::page::{ ParquetPageHeader, }; use crate::parquet::CowBuffer; +use crate::write::Encoding; /// This meta is a small part of [`ColumnChunkMetadata`]. #[derive(Debug, Clone, PartialEq, Eq)] @@ -251,7 +252,10 @@ pub(super) fn finish_page( })?; if do_verbose { - println!("DictPage ( )"); + eprintln!( + "Parquet DictPage ( num_values: {}, datatype: {:?} )", + dict_header.num_values, descriptor.primitive_type + ); } let is_sorted = dict_header.is_sorted.unwrap_or(false); @@ -275,9 +279,11 @@ pub(super) fn finish_page( })?; if do_verbose { - println!( - "DataPageV1 ( num_values: {}, datatype: {:?}, encoding: {:?} )", - header.num_values, descriptor.primitive_type, header.encoding + eprintln!( + "Parquet DataPageV1 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, + descriptor.primitive_type, + Encoding::try_from(header.encoding).ok() ); } @@ -298,8 +304,10 @@ pub(super) fn finish_page( if do_verbose { println!( - "DataPageV2 ( num_values: {}, datatype: {:?}, encoding: {:?} )", - header.num_values, descriptor.primitive_type, header.encoding + "Parquet DataPageV2 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, + descriptor.primitive_type, + Encoding::try_from(header.encoding).ok() ); } From 8ca3ee4bd30e3643330460f61e2a5dcd847ae82d Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 12:41:42 +0200 Subject: [PATCH 2/4] feature gate and totalord f16 --- crates/polars-arrow/src/types/native.rs | 41 ++++++++++++++++++- crates/polars-compute/src/cardinality.rs | 4 +- crates/polars-parquet/Cargo.toml | 2 +- .../src/arrow/write/dictionary.rs | 4 +- 4 files changed, 45 insertions(+), 6 deletions(-) diff --git a/crates/polars-arrow/src/types/native.rs b/crates/polars-arrow/src/types/native.rs index 6f869df32602..230fdde387d1 100644 --- a/crates/polars-arrow/src/types/native.rs +++ b/crates/polars-arrow/src/types/native.rs @@ -1,10 +1,11 @@ +use std::hash::{Hash, Hasher}; use std::ops::Neg; use std::panic::RefUnwindSafe; use bytemuck::{Pod, Zeroable}; use polars_utils::min_max::MinMax; use polars_utils::nulls::IsNull; -use polars_utils::total_ord::{TotalEq, TotalOrd}; +use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash, TotalOrd, TotalOrdWrap}; use super::PrimitiveType; @@ -434,6 +435,44 @@ impl PartialEq for f16 { } } +/// Converts an f32 into a canonical form, where -0 == 0 and all NaNs map to +/// the same value. +#[inline] +pub fn canonical_f16(x: f16) -> f16 { + // zero out the sign bit if the f16 is zero. + let convert_zero = f16(x.0 & (0x7FFF | (u16::from(x.0 & 0x7FFF == 0) << 15))); + if convert_zero.is_nan() { + f16::from_bits(0x7c00) // Canonical quiet NaN. + } else { + convert_zero + } +} + +impl TotalHash for f16 { + #[inline(always)] + fn tot_hash(&self, state: &mut H) + where + H: Hasher, + { + canonical_f16(*self).to_bits().hash(state) + } +} + +impl ToTotalOrd for f16 { + type TotalOrdItem = TotalOrdWrap; + type SourceItem = f16; + + #[inline] + fn to_total_ord(&self) -> Self::TotalOrdItem { + TotalOrdWrap(*self) + } + + #[inline] + fn peel_total_ord(ord_item: Self::TotalOrdItem) -> Self::SourceItem { + ord_item.0 + } +} + impl IsNull for f16 { const HAS_NULLS: bool = false; type Inner = f16; diff --git a/crates/polars-compute/src/cardinality.rs b/crates/polars-compute/src/cardinality.rs index fa613382661a..d28efa9d051e 100644 --- a/crates/polars-compute/src/cardinality.rs +++ b/crates/polars-compute/src/cardinality.rs @@ -4,7 +4,7 @@ use arrow::array::{ }; use arrow::datatypes::PhysicalType; use arrow::types::Offset; -use arrow::with_match_primitive_type; +use arrow::with_match_primitive_type_full; use polars_utils::total_ord::ToTotalOrd; use crate::hyperloglogplus::HyperLogLog; @@ -42,7 +42,7 @@ pub fn estimate_cardinality(array: &dyn Array) -> usize { cardinality }, - PT::Primitive(primitive_type) => with_match_primitive_type!(primitive_type, |$T| { + PT::Primitive(primitive_type) => with_match_primitive_type_full!(primitive_type, |$T| { let mut hll = HyperLogLog::new(); let array = array diff --git a/crates/polars-parquet/Cargo.toml b/crates/polars-parquet/Cargo.toml index 26a57b22e713..881c9a477398 100644 --- a/crates/polars-parquet/Cargo.toml +++ b/crates/polars-parquet/Cargo.toml @@ -22,7 +22,7 @@ fallible-streaming-iterator = { workspace = true, optional = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } num-traits = { workspace = true } -polars-compute = { workspace = true } +polars-compute = { workspace = true, features = ["approx_unique"] } polars-error = { workspace = true } polars-utils = { workspace = true, features = ["mmap"] } simdutf8 = { workspace = true } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 4a297c06e5fc..90656a6d39f1 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -65,7 +65,7 @@ minmaxthreshold_impls! { i8, u8 => 16, u8::MAX as usize, i16, u16 => 256, u16::MAX as usize, i32, u32 => 512, u16::MAX as usize, - i64, u64 => 2048, u16::MAX as u64 as usize, + i64, u64 => 2048, u16::MAX as usize, } enum DictionaryDecision { @@ -153,7 +153,7 @@ where let mut lookup = vec![0u16; diff + 1]; for (i, &idx) in indexes.iter().enumerate() { - lookup[idx.as_()] = i as u16; + lookup[(idx - min).as_()] = i as u16; } use ArrowDataType as DT; From 6fc39e902c2e539ef0bc20c14c5393699b70aeef Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 14:00:33 +0200 Subject: [PATCH 3/4] fix overflow --- crates/polars-parquet/src/arrow/write/dictionary.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 90656a6d39f1..17527fc488f7 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -98,7 +98,9 @@ where }; debug_assert!(max >= min, "{max} >= {min}"); - let diff = max.checked_sub(&min).unwrap(); + let Some(diff) = max.checked_sub(&min) else { + return DictionaryDecision::TryAgain; + }; let diff = diff.as_(); From c6a9f8eaf0f44922663aef5fc64cbaaf3d101531 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 15:09:02 +0200 Subject: [PATCH 4/4] feature gate cardinality in compute --- crates/polars-compute/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/polars-compute/src/lib.rs b/crates/polars-compute/src/lib.rs index 49774ef288c1..30efdd59adc7 100644 --- a/crates/polars-compute/src/lib.rs +++ b/crates/polars-compute/src/lib.rs @@ -10,6 +10,7 @@ use arrow::types::NativeType; pub mod arithmetic; pub mod arity; pub mod bitwise; +#[cfg(feature = "approx_unique")] pub mod cardinality; pub mod comparisons; pub mod filter;