Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Fast decision for Parquet dictionary encoding #19256

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion crates/polars-arrow/src/types/native.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::hash::{Hash, Hasher};
use std::ops::Neg;
use std::panic::RefUnwindSafe;

use bytemuck::{Pod, Zeroable};
use polars_utils::min_max::MinMax;
use polars_utils::nulls::IsNull;
use polars_utils::total_ord::{TotalEq, TotalOrd};
use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash, TotalOrd, TotalOrdWrap};

use super::PrimitiveType;

Expand Down Expand Up @@ -434,6 +435,44 @@ impl PartialEq for f16 {
}
}

/// Converts an f32 into a canonical form, where -0 == 0 and all NaNs map to
/// the same value.
#[inline]
pub fn canonical_f16(x: f16) -> f16 {
// zero out the sign bit if the f16 is zero.
let convert_zero = f16(x.0 & (0x7FFF | (u16::from(x.0 & 0x7FFF == 0) << 15)));
if convert_zero.is_nan() {
f16::from_bits(0x7c00) // Canonical quiet NaN.
} else {
convert_zero
}
}

impl TotalHash for f16 {
#[inline(always)]
fn tot_hash<H>(&self, state: &mut H)
where
H: Hasher,
{
canonical_f16(*self).to_bits().hash(state)
}
}

impl ToTotalOrd for f16 {
type TotalOrdItem = TotalOrdWrap<f16>;
type SourceItem = f16;

#[inline]
fn to_total_ord(&self) -> Self::TotalOrdItem {
TotalOrdWrap(*self)
}

#[inline]
fn peel_total_ord(ord_item: Self::TotalOrdItem) -> Self::SourceItem {
ord_item.0
}
}

impl IsNull for f16 {
const HAS_NULLS: bool = false;
type Inner = f16;
Expand Down
159 changes: 159 additions & 0 deletions crates/polars-compute/src/cardinality.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use arrow::array::{
Array, BinaryArray, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, PrimitiveArray,
Utf8Array, Utf8ViewArray,
};
use arrow::datatypes::PhysicalType;
use arrow::types::Offset;
use arrow::with_match_primitive_type_full;
use polars_utils::total_ord::ToTotalOrd;

use crate::hyperloglogplus::HyperLogLog;

/// Get an estimate for the *cardinality* of the array (i.e. the number of unique values)
///
/// This is not currently implemented for nested types.
pub fn estimate_cardinality(array: &dyn Array) -> usize {
if array.is_empty() {
return 0;
}

if array.null_count() == array.len() {
return 1;
}

// Estimate the cardinality with HyperLogLog
use PhysicalType as PT;
match array.dtype().to_physical_type() {
PT::Null => 1,

PT::Boolean => {
let mut cardinality = 0;

let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();

cardinality += usize::from(array.has_nulls());

if let Some(unset_bits) = array.values().lazy_unset_bits() {
cardinality += 1 + usize::from(unset_bits != array.len());
} else {
cardinality += 2;
}

cardinality
},

PT::Primitive(primitive_type) => with_match_primitive_type_full!(primitive_type, |$T| {
let mut hll = HyperLogLog::new();

let array = array
.as_any()
.downcast_ref::<PrimitiveArray<$T>>()
.unwrap();

if array.has_nulls() {
for v in array.iter() {
let v = v.copied().unwrap_or_default();
hll.add(&v.to_total_ord());
}
Comment on lines +54 to +57
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Not sure how big these arrays are, but if they are full column length, does it make sense to partition the work of computing the cardinality across rows, producing a partial estimate in each thread and then using HyperLogLog::merge to reduce across threads?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These arrays are equally large as a row group in parquet.

I don't really see much benefit in parallelizing here, because we already parallelize over columns and row groups. So unless you have a short dataframe with few columns, you probably won't see any sleeping threads.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure.

} else {
for v in array.values_iter() {
hll.add(&v.to_total_ord());
}
}

hll.count()
}),
PT::FixedSizeBinary => {
let mut hll = HyperLogLog::new();

let array = array
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();

if array.has_nulls() {
for v in array.iter() {
let v = v.unwrap_or_default();
hll.add(v);
}
} else {
for v in array.values_iter() {
hll.add(v);
}
}

hll.count()
},
PT::Binary => {
binary_offset_array_estimate(array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap())
},
PT::LargeBinary => {
binary_offset_array_estimate(array.as_any().downcast_ref::<BinaryArray<i64>>().unwrap())
},
PT::Utf8 => binary_offset_array_estimate(
&array
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.to_binary(),
),
PT::LargeUtf8 => binary_offset_array_estimate(
&array
.as_any()
.downcast_ref::<Utf8Array<i64>>()
.unwrap()
.to_binary(),
),
PT::BinaryView => {
binary_view_array_estimate(array.as_any().downcast_ref::<BinaryViewArray>().unwrap())
},
PT::Utf8View => binary_view_array_estimate(
&array
.as_any()
.downcast_ref::<Utf8ViewArray>()
.unwrap()
.to_binview(),
),
PT::List => unimplemented!(),
PT::FixedSizeList => unimplemented!(),
PT::LargeList => unimplemented!(),
PT::Struct => unimplemented!(),
PT::Union => unimplemented!(),
PT::Map => unimplemented!(),
PT::Dictionary(_) => unimplemented!(),
}
}

fn binary_offset_array_estimate<O: Offset>(array: &BinaryArray<O>) -> usize {
let mut hll = HyperLogLog::new();

if array.has_nulls() {
for v in array.iter() {
let v = v.unwrap_or_default();
hll.add(v);
}
} else {
for v in array.values_iter() {
hll.add(v);
}
}

hll.count()
}

fn binary_view_array_estimate(array: &BinaryViewArray) -> usize {
let mut hll = HyperLogLog::new();

if array.has_nulls() {
for v in array.iter() {
let v = v.unwrap_or_default();
hll.add(v);
}
} else {
for v in array.values_iter() {
hll.add(v);
}
}

hll.count()
}
2 changes: 2 additions & 0 deletions crates/polars-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use arrow::types::NativeType;
pub mod arithmetic;
pub mod arity;
pub mod bitwise;
#[cfg(feature = "approx_unique")]
pub mod cardinality;
pub mod comparisons;
pub mod filter;
pub mod float_sum;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fallible-streaming-iterator = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-compute = { workspace = true, features = ["approx_unique"] }
polars-error = { workspace = true }
polars-utils = { workspace = true, features = ["mmap"] }
simdutf8 = { workspace = true }
Expand Down
Loading
Loading