diff --git a/crates/polars-core/src/frame/column/mod.rs b/crates/polars-core/src/frame/column/mod.rs index 313a4ec89df2..74b6f0e7da0a 100644 --- a/crates/polars-core/src/frame/column/mod.rs +++ b/crates/polars-core/src/frame/column/mod.rs @@ -677,25 +677,6 @@ impl Column { .vec_hash_combine(build_hasher, hashes) } - /// # Safety - /// - /// Indexes need to be in bounds. - pub(crate) unsafe fn equal_element( - &self, - idx_self: usize, - idx_other: usize, - other: &Column, - ) -> bool { - // @scalar-opt - unsafe { - self.as_materialized_series().equal_element( - idx_self, - idx_other, - other.as_materialized_series(), - ) - } - } - pub fn append(&mut self, other: &Column) -> PolarsResult<&mut Self> { // @scalar-opt self.into_materialized_series() diff --git a/crates/polars-core/src/hashing/identity.rs b/crates/polars-core/src/hashing/identity.rs index e917291f1586..a1ae697106f9 100644 --- a/crates/polars-core/src/hashing/identity.rs +++ b/crates/polars-core/src/hashing/identity.rs @@ -33,28 +33,3 @@ impl Hasher for IdHasher { } pub type IdBuildHasher = BuildHasherDefault; - -#[derive(Debug)] -/// Contains an idx of a row in a DataFrame and the precomputed hash of that row. -/// -/// That hash still needs to be used to create another hash to be able to resize hashmaps without -/// accidental quadratic behavior. So do not use an Identity function! -pub struct IdxHash { - // idx in row of Series, DataFrame - pub idx: IdxSize, - // precomputed hash of T - pub hash: u64, -} - -impl Hash for IdxHash { - fn hash(&self, state: &mut H) { - state.write_u64(self.hash) - } -} - -impl IdxHash { - #[inline] - pub(crate) fn new(idx: IdxSize, hash: u64) -> Self { - IdxHash { idx, hash } - } -} diff --git a/crates/polars-core/src/hashing/mod.rs b/crates/polars-core/src/hashing/mod.rs index 8f966eb2f317..370401eb6e3e 100644 --- a/crates/polars-core/src/hashing/mod.rs +++ b/crates/polars-core/src/hashing/mod.rs @@ -1,15 +1,11 @@ mod identity; pub(crate) mod vector_hasher; -use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher}; +use std::hash::{BuildHasherDefault, Hash, Hasher}; -use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; pub use identity::*; pub use vector_hasher::*; -use crate::prelude::*; - // hash combine from c++' boost lib #[inline] pub fn _boost_hash_combine(l: u64, r: u64) -> u64 { @@ -19,73 +15,3 @@ pub fn _boost_hash_combine(l: u64, r: u64) -> u64 { // We must strike a balance between cache // Overallocation seems a lot more expensive than resizing so we start reasonable small. pub const _HASHMAP_INIT_SIZE: usize = 512; - -/// Utility function used as comparison function in the hashmap. -/// The rationale is that equality is an AND operation and therefore its probability of success -/// declines rapidly with the number of keys. Instead of first copying an entire row from both -/// sides and then do the comparison, we do the comparison value by value catching early failures -/// eagerly. -/// -/// # Safety -/// Doesn't check any bounds -#[inline] -pub(crate) unsafe fn compare_df_rows(keys: &DataFrame, idx_a: usize, idx_b: usize) -> bool { - for s in keys.get_columns() { - if !s.equal_element(idx_a, idx_b, s) { - return false; - } - } - true -} - -/// Populate a multiple key hashmap with row indexes. -/// -/// Instead of the keys (which could be very large), the row indexes are stored. -/// To check if a row is equal the original DataFrame is also passed as ref. -/// When a hash collision occurs the indexes are ptrs to the rows and the rows are compared -/// on equality. -pub fn populate_multiple_key_hashmap( - hash_tbl: &mut HashMap, - // row index - idx: IdxSize, - // hash - original_h: u64, - // keys of the hash table (will not be inserted, the indexes will be used) - // the keys are needed for the equality check - keys: &DataFrame, - // value to insert - vacant_fn: G, - // function that gets a mutable ref to the occupied value in the hash table - mut occupied_fn: F, -) where - G: Fn() -> V, - F: FnMut(&mut V), - H: BuildHasher, -{ - let entry = hash_tbl - .raw_entry_mut() - // uses the idx to probe rows in the original DataFrame with keys - // to check equality to find an entry - // this does not invalidate the hashmap as this equality function is not used - // during rehashing/resize (then the keys are already known to be unique). - // Only during insertion and probing an equality function is needed - .from_hash(original_h, |idx_hash| { - // first check the hash values - // before we incur a cache miss - idx_hash.hash == original_h && { - let key_idx = idx_hash.idx; - // SAFETY: - // indices in a group_by operation are always in bounds. - unsafe { compare_df_rows(keys, key_idx as usize, idx as usize) } - } - }); - match entry { - RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(original_h, IdxHash::new(idx, original_h), vacant_fn()); - }, - RawEntryMut::Occupied(mut entry) => { - let (_k, v) = entry.get_key_value_mut(); - occupied_fn(v); - }, - } -} diff --git a/crates/polars-ops/src/frame/join/asof/groups.rs b/crates/polars-ops/src/frame/join/asof/groups.rs index 9332b10e392b..3f2ef6a63ca8 100644 --- a/crates/polars-ops/src/frame/join/asof/groups.rs +++ b/crates/polars-ops/src/frame/join/asof/groups.rs @@ -1,104 +1,20 @@ use std::hash::Hash; -use hashbrown::HashMap; use num_traits::Zero; -use polars_core::hashing::{ - IdxHash, _df_rows_to_hashes_threaded_vertical, populate_multiple_key_hashmap, - _HASHMAP_INIT_SIZE, -}; +use polars_core::hashing::_HASHMAP_INIT_SIZE; use polars_core::prelude::*; use polars_core::series::BitRepr; use polars_core::utils::flatten::flatten_nullable; -use polars_core::utils::{_set_partition_size, split_and_flatten}; -use polars_core::{with_match_physical_float_polars_type, IdBuildHasher, POOL}; +use polars_core::utils::split_and_flatten; +use polars_core::{with_match_physical_float_polars_type, POOL}; use polars_utils::abs_diff::AbsDiff; -use polars_utils::aliases::PlRandomState; use polars_utils::hashing::{hash_to_partition, DirtyHash}; -use polars_utils::idx_vec::IdxVec; use polars_utils::nulls::IsNull; -use polars_utils::pl_str::PlSmallStr; use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash}; -use polars_utils::unitvec; use rayon::prelude::*; use super::*; - -/// Compare the rows of two [`DataFrame`]s -pub(crate) unsafe fn compare_df_rows2( - left: &DataFrame, - right: &DataFrame, - left_idx: usize, - right_idx: usize, - join_nulls: bool, -) -> bool { - for (l, r) in left.get_columns().iter().zip(right.get_columns()) { - let l = l.get_unchecked(left_idx); - let r = r.get_unchecked(right_idx); - if !l.eq_missing(&r, join_nulls) { - return false; - } - } - true -} - -pub(crate) fn create_probe_table( - hashes: &[UInt64Chunked], - keys: &DataFrame, -) -> Vec> { - let n_partitions = _set_partition_size(); - - // We will create a hashtable in every thread. - // We use the hash to partition the keys to the matching hashtable. - // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - POOL.install(|| { - (0..n_partitions) - .into_par_iter() - .map(|part_no| { - let mut hash_tbl: HashMap = - HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default()); - - let mut offset = 0; - for hashes in hashes { - for hashes in hashes.data_views() { - let len = hashes.len(); - let mut idx = 0; - hashes.iter().for_each(|h| { - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if part_no == hash_to_partition(*h, n_partitions) { - let idx = idx + offset; - populate_multiple_key_hashmap( - &mut hash_tbl, - idx, - *h, - keys, - || unitvec![idx], - |v| v.push(idx), - ) - } - idx += 1; - }); - - offset += len as IdxSize; - } - } - hash_tbl - }) - .collect() - }) -} - -pub(crate) fn get_offsets(probe_hashes: &[UInt64Chunked]) -> Vec { - probe_hashes - .iter() - .map(|ph| ph.len()) - .scan(0, |state, val| { - let out = *state; - *state += val; - Some(out) - }) - .collect() -} +use crate::frame::join::{prepare_binary, prepare_keys_multiple}; fn compute_len_offsets>(iter: I) -> Vec { let mut cumlen = 0; @@ -238,14 +154,16 @@ where Ok(flatten_nullable(&bufs)) } -fn asof_join_by_binary( - by_left: &BinaryChunked, - by_right: &BinaryChunked, +fn asof_join_by_binary( + by_left: &ChunkedArray, + by_right: &ChunkedArray, left_asof: &ChunkedArray, right_asof: &ChunkedArray, filter: F, ) -> IdxArr where + B: PolarsDataType, + for<'b> ::ValueT<'b>: AsRef<[u8]>, T: PolarsDataType, A: for<'a> AsofJoinState>, F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool, @@ -254,14 +172,8 @@ where let left_val_arr = left_asof.downcast_iter().next().unwrap(); let right_val_arr = right_asof.downcast_iter().next().unwrap(); - let n_threads = POOL.current_num_threads(); - let split_by_left = split_and_flatten(by_left, n_threads); - let split_by_right = split_and_flatten(by_right, n_threads); - let offsets = compute_len_offsets(split_by_left.iter().map(|s| s.len())); - - let hb = PlRandomState::default(); - let prep_by_left = prepare_bytes(&split_by_left, &hb); - let prep_by_right = prepare_bytes(&split_by_right, &hb); + let (prep_by_left, prep_by_right, _, _) = prepare_binary::(by_left, by_right, false); + let offsets = compute_len_offsets(prep_by_left.iter().map(|s| s.len())); let hash_tbls = build_tables(prep_by_right, false); let n_tables = hash_tbls.len(); @@ -303,87 +215,6 @@ where flatten_nullable(&bufs) } -fn asof_join_by_multiple( - by_left: &mut DataFrame, - by_right: &mut DataFrame, - left_asof: &ChunkedArray, - right_asof: &ChunkedArray, - filter: F, -) -> IdxArr -where - T: PolarsDataType, - A: for<'a> AsofJoinState>, - F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool, -{ - let (left_asof, right_asof) = POOL.join(|| left_asof.rechunk(), || right_asof.rechunk()); - let left_val_arr = left_asof.downcast_iter().next().unwrap(); - let right_val_arr = right_asof.downcast_iter().next().unwrap(); - - let n_threads = POOL.current_num_threads(); - let split_by_left = split_and_flatten(by_left, n_threads); - let split_by_right = split_and_flatten(by_right, n_threads); - - let (build_hashes, random_state) = - _df_rows_to_hashes_threaded_vertical(&split_by_right, None).unwrap(); - let (probe_hashes, _) = - _df_rows_to_hashes_threaded_vertical(&split_by_left, Some(random_state)).unwrap(); - - let hash_tbls = create_probe_table(&build_hashes, by_right); - drop(build_hashes); // Early drop to reduce memory pressure. - let offsets = get_offsets(&probe_hashes); - let n_tables = hash_tbls.len(); - - // Now we probe the right hand side for each left hand side. - let iter = probe_hashes - .into_par_iter() - .zip(offsets) - .map(|(hash_by_left, offset)| { - let mut results = Vec::with_capacity(hash_by_left.len()); - let mut group_states: PlHashMap<_, A> = PlHashMap::with_capacity(_HASHMAP_INIT_SIZE); - - let mut ctr = 0; - for by_left_view in hash_by_left.data_views() { - for h_left in by_left_view.iter().copied() { - let idx_left = offset + ctr; - ctr += 1; - let opt_left_val = left_val_arr.get(idx_left); - - let Some(left_val) = opt_left_val else { - results.push(NullableIdxSize::null()); - continue; - }; - - let group_probe_table = - unsafe { hash_tbls.get_unchecked(hash_to_partition(h_left, n_tables)) }; - - let entry = group_probe_table.raw_entry().from_hash(h_left, |idx_hash| { - let idx_right = idx_hash.idx; - // SAFETY: indices in a join operation are always in bounds. - unsafe { - compare_df_rows2(by_left, by_right, idx_left, idx_right as usize, false) - } - }); - let Some((_, right_grp_idxs)) = entry else { - results.push(NullableIdxSize::null()); - continue; - }; - let id = asof_in_group::( - left_val, - right_val_arr, - &right_grp_idxs[..], - &mut group_states, - &filter, - ); - - results.push(materialize_nullable(id)); - } - } - results - }); - let bufs = POOL.install(|| iter.collect::>()); - flatten_nullable(&bufs) -} - #[allow(clippy::too_many_arguments)] fn dispatch_join_by_type( left_asof: &ChunkedArray, @@ -409,12 +240,16 @@ where DataType::String => { let left_by = &left_by_s.str().unwrap().as_binary(); let right_by = right_by_s.str().unwrap().as_binary(); - asof_join_by_binary::(left_by, &right_by, left_asof, right_asof, filter) + asof_join_by_binary::( + left_by, &right_by, left_asof, right_asof, filter, + ) }, DataType::Binary => { let left_by = &left_by_s.binary().unwrap(); let right_by = right_by_s.binary().unwrap(); - asof_join_by_binary::(left_by, right_by, left_asof, right_asof, filter) + asof_join_by_binary::( + left_by, right_by, left_asof, right_asof, filter, + ) }, x if x.is_float() => { with_match_physical_float_polars_type!(left_by_s.dtype(), |$T| { @@ -458,7 +293,15 @@ where #[cfg(feature = "dtype-categorical")] _check_categorical_src(lhs.dtype(), rhs.dtype())?; } - asof_join_by_multiple::(left_by, right_by, left_asof, right_asof, filter) + + // TODO: @scalar-opt. + let left_by_series: Vec<_> = left_by.materialized_column_iter().cloned().collect(); + let right_by_series: Vec<_> = right_by.materialized_column_iter().cloned().collect(); + let lhs_keys = prepare_keys_multiple(&left_by_series, false)?; + let rhs_keys = prepare_keys_multiple(&right_by_series, false)?; + asof_join_by_binary::( + &lhs_keys, &rhs_keys, left_asof, right_asof, filter, + ) }; Ok(out) } diff --git a/crates/polars-ops/src/frame/join/asof/mod.rs b/crates/polars-ops/src/frame/join/asof/mod.rs index 71e813cdac39..cb122a649c4f 100644 --- a/crates/polars-ops/src/frame/join/asof/mod.rs +++ b/crates/polars-ops/src/frame/join/asof/mod.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "dtype-categorical")] use super::_check_categorical_src; -use super::{_finish_join, build_tables, prepare_bytes}; +use super::{_finish_join, build_tables}; use crate::frame::IntoDf; use crate::series::SeriesMethods; diff --git a/crates/polars-ops/src/frame/join/hash_join/mod.rs b/crates/polars-ops/src/frame/join/hash_join/mod.rs index 35e4ea9403af..fcda1be6802d 100644 --- a/crates/polars-ops/src/frame/join/hash_join/mod.rs +++ b/crates/polars-ops/src/frame/join/hash_join/mod.rs @@ -12,7 +12,7 @@ use polars_core::POOL; use polars_utils::index::ChunkId; pub(super) use single_keys::*; #[cfg(feature = "asof_join")] -pub(super) use single_keys_dispatch::prepare_bytes; +pub(super) use single_keys_dispatch::prepare_binary; pub use single_keys_dispatch::SeriesJoin; use single_keys_inner::*; use single_keys_left::*; diff --git a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs index a8093873ea51..f79e8759d9e8 100644 --- a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs +++ b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs @@ -507,27 +507,7 @@ where } } -#[cfg(feature = "asof_join")] -pub fn prepare_bytes<'a>( - been_split: &'a [BinaryChunked], - hb: &PlRandomState, -) -> Vec>> { - POOL.install(|| { - been_split - .par_iter() - .map(|ca| { - ca.iter() - .map(|opt_b| { - let hash = hb.hash_one(opt_b); - BytesHash::new(opt_b, hash) - }) - .collect::>() - }) - .collect() - }) -} - -fn prepare_binary<'a, T>( +pub(crate) fn prepare_binary<'a, T>( ca: &'a ChunkedArray, other: &'a ChunkedArray, // In inner join and outer join, the shortest relation will be used to create a hash table.