From 0bd1c2450dd83bd46de87c7c4ca3c7c444056174 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 16:44:01 +0200 Subject: [PATCH 1/6] refactor(rust): Eliminate some uses of deprecated raw_entry --- .../src/array/dictionary/value_map.rs | 95 +++++--------- .../polars-core/src/frame/group_by/hashing.rs | 116 +++++++++--------- .../polars-core/src/frame/group_by/proxy.rs | 3 + .../polars-ops/src/chunked_array/list/sets.rs | 2 +- crates/polars-utils/src/total_ord.rs | 4 +- 5 files changed, 93 insertions(+), 127 deletions(-) diff --git a/crates/polars-arrow/src/array/dictionary/value_map.rs b/crates/polars-arrow/src/array/dictionary/value_map.rs index d818b7e6b25c..0659e7565200 100644 --- a/crates/polars-arrow/src/array/dictionary/value_map.rs +++ b/crates/polars-arrow/src/array/dictionary/value_map.rs @@ -1,9 +1,9 @@ use std::borrow::Borrow; use std::fmt::{self, Debug}; -use std::hash::{BuildHasherDefault, Hash, Hasher}; +use std::hash::Hash; -use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; +use hashbrown::hash_table::Entry; +use hashbrown::HashTable; use polars_error::{polars_bail, polars_err, PolarsResult}; use polars_utils::aliases::PlRandomState; @@ -12,47 +12,10 @@ use crate::array::indexable::{AsIndexed, Indexable}; use crate::array::{Array, MutableArray}; use crate::datatypes::ArrowDataType; -/// Hasher for pre-hashed values; similar to `hash_hasher` but with native endianness. -/// -/// We know that we'll only use it for `u64` values, so we can avoid endian conversion. -/// -/// Invariant: hash of a u64 value is always equal to itself. -#[derive(Copy, Clone, Default)] -pub struct PassthroughHasher(u64); - -impl Hasher for PassthroughHasher { - #[inline] - fn write_u64(&mut self, value: u64) { - self.0 = value; - } - - fn write(&mut self, _: &[u8]) { - unreachable!(); - } - - #[inline] - fn finish(&self) -> u64 { - self.0 - } -} - -#[derive(Clone)] -pub struct Hashed { - hash: u64, - key: K, -} - -impl Hash for Hashed { - #[inline] - fn hash(&self, state: &mut H) { - self.hash.hash(state) - } -} - #[derive(Clone)] pub struct ValueMap { pub values: M, - pub map: HashMap, (), BuildHasherDefault>, // NB: *only* use insert_hashed_nocheck() and no other hashmap API + pub map: HashTable<(u64, K)>, random_state: PlRandomState, } @@ -63,7 +26,7 @@ impl ValueMap { } Ok(Self { values, - map: HashMap::default(), + map: HashTable::default(), random_state: PlRandomState::default(), }) } @@ -73,10 +36,7 @@ impl ValueMap { M: Indexable, M::Type: Eq + Hash, { - let mut map = HashMap::, _, _>::with_capacity_and_hasher( - values.len(), - BuildHasherDefault::::default(), - ); + let mut map: HashTable<(u64, K)> = HashTable::with_capacity(values.len()); let random_state = PlRandomState::default(); for index in 0..values.len() { let key = K::try_from(index).map_err(|_| polars_err!(ComputeError: "overflow"))?; @@ -84,18 +44,21 @@ impl ValueMap { let value = unsafe { values.value_unchecked_at(index) }; let hash = random_state.hash_one(value.borrow()); - let entry = map.raw_entry_mut().from_hash(hash, |item| { - // SAFETY: invariant of the struct, it's always in bounds since we maintain it - let stored_value = unsafe { values.value_unchecked_at(item.key.as_usize()) }; - stored_value.borrow() == value.borrow() - }); + let entry = map.entry( + hash, + |(_h, key)| { + // SAFETY: invariant of the struct, it's always in bounds. + let stored_value = unsafe { values.value_unchecked_at(key.as_usize()) }; + stored_value.borrow() == value.borrow() + }, + |(h, _key)| *h, + ); match entry { - RawEntryMut::Occupied(_) => { + Entry::Occupied(_) => { polars_bail!(InvalidOperation: "duplicate value in dictionary values array") }, - RawEntryMut::Vacant(entry) => { - // NB: don't use .insert() here! - entry.insert_hashed_nocheck(hash, Hashed { hash, key }, ()); + Entry::Vacant(entry) => { + entry.insert((hash, key)); }, } } @@ -137,19 +100,21 @@ impl ValueMap { M::Type: Eq + Hash, { let hash = self.random_state.hash_one(value.as_indexed()); - let entry = self.map.raw_entry_mut().from_hash(hash, |item| { - // SAFETY: we've already checked (the inverse) when we pushed it, so it should be ok? - let index = unsafe { item.key.as_usize() }; - // SAFETY: invariant of the struct, it's always in bounds since we maintain it - let stored_value = unsafe { self.values.value_unchecked_at(index) }; - stored_value.borrow() == value.as_indexed() - }); + let entry = self.map.entry( + hash, + |(_h, key)| { + // SAFETY: invariant of the struct, it's always in bounds. + let stored_value = unsafe { self.values.value_unchecked_at(key.as_usize()) }; + stored_value.borrow() == value.as_indexed() + }, + |(h, _key)| *h, + ); let out = match entry { - RawEntryMut::Occupied(entry) => entry.key().key, - RawEntryMut::Vacant(entry) => { + Entry::Occupied(entry) => entry.get().1, + Entry::Vacant(entry) => { let index = self.values.len(); let key = K::try_from(index).map_err(|_| polars_err!(ComputeError: "overflow"))?; - entry.insert_hashed_nocheck(hash, Hashed { hash, key }, ()); // NB: don't use .insert() here! + entry.insert((hash, key)); push(&mut self.values, value)?; debug_assert_eq!(self.values.len(), index + 1); key diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 418471abc388..701e185a9b15 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -1,11 +1,13 @@ -use std::hash::{BuildHasher, Hash, Hasher}; +use std::hash::Hash; -use hashbrown::hash_map::RawEntryMut; +use hashbrown::hash_map::{Entry, RawEntryMut}; use polars_utils::hashing::{hash_to_partition, DirtyHash}; use polars_utils::idx_vec::IdxVec; +use polars_utils::itertools::Itertools; use polars_utils::sync::SyncPtr; -use polars_utils::total_ord::{ToTotalOrd, TotalHash}; +use polars_utils::total_ord::{ToTotalOrd, TotalHash, TotalOrdWrap}; use polars_utils::unitvec; +use polars_utils::vec::PushUnchecked; use rayon::prelude::*; use crate::hashing::*; @@ -73,50 +75,55 @@ fn finish_group_order(mut out: Vec>, sorted: bool) -> GroupsProxy { } } -pub(crate) fn group_by(a: impl Iterator, sorted: bool) -> GroupsProxy +pub(crate) fn group_by(keys: impl Iterator, sorted: bool) -> GroupsProxy where - T: TotalHash + TotalEq, + K: TotalHash + TotalEq, { let init_size = get_init_size(); - let mut hash_tbl: PlHashMap = PlHashMap::with_capacity(init_size); - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - a.for_each(|k| { - let idx = cnt; - cnt += 1; + let (mut first, mut groups); + if sorted { + groups = Vec::with_capacity(get_init_size()); + let mut hash_tbl: PlHashMap, IdxSize> = PlHashMap::with_capacity(init_size); + for (idx, k) in keys.enumerate() { + let idx = idx as IdxSize; + match hash_tbl.entry(TotalOrdWrap(k)) { + Entry::Vacant(entry) => { + let group_idx = groups.len() as IdxSize; + entry.insert(group_idx); + groups.push(unitvec![idx]); + }, + Entry::Occupied(entry) => unsafe { + groups.get_unchecked_mut(*entry.get() as usize).push(idx) + }, + } + } - let mut state = hasher.build_hasher(); - k.tot_hash(&mut state); - let h = state.finish(); - let entry = hash_tbl.raw_entry_mut().from_hash(h, |k_| k.tot_eq(k_)); + first = groups + .iter() + .map(|v| unsafe { *v.first().unwrap_unchecked() }) + .collect_vec(); + } else { + let mut hash_tbl: PlHashMap, IdxVec> = PlHashMap::with_capacity(init_size); + for (idx, k) in keys.enumerate() { + let idx = idx as IdxSize; + match hash_tbl.entry(TotalOrdWrap(k)) { + Entry::Vacant(entry) => { + entry.insert(unitvec![idx]); + }, + Entry::Occupied(mut entry) => entry.get_mut().push(idx), + } + } - match entry { - RawEntryMut::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert_with_hasher(h, k, (idx, tuples), |k| { - let mut state = hasher.build_hasher(); - k.tot_hash(&mut state); - state.finish() - }); - }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); - }, + first = Vec::with_capacity(hash_tbl.len()); + groups = Vec::with_capacity(hash_tbl.len()); + unsafe { + for v in hash_tbl.into_values() { + first.push_unchecked(*v.first().unwrap_unchecked()); + groups.push_unchecked(v); + } } - }); - if sorted { - let mut groups = hash_tbl - .into_iter() - .map(|(_k, v)| v) - .collect_trusted::>(); - groups.sort_unstable_by_key(|g| g.0); - let mut idx: GroupsIdx = groups.into_iter().collect(); - idx.sorted = true; - GroupsProxy::Idx(idx) - } else { - GroupsProxy::Idx(hash_tbl.into_values().collect()) } + GroupsProxy::Idx(GroupsIdx::new(first, groups, sorted)) } // giving the slice info to the compiler is much @@ -128,8 +135,8 @@ pub(crate) fn group_by_threaded_slice( sorted: bool, ) -> GroupsProxy where - T: TotalHash + TotalEq + ToTotalOrd, - ::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash, + T: ToTotalOrd, + ::TotalOrdItem: Send + Sync + Copy + DirtyHash, IntoSlice: AsRef<[T]> + Send + Sync, { let init_size = get_init_size(); @@ -148,32 +155,23 @@ where for keys in &keys { let keys = keys.as_ref(); let len = keys.len() as IdxSize; - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - keys.iter().for_each(|k| { + for (key_idx, k) in keys.iter().enumerate() { let k = k.to_total_ord(); - let idx = cnt + offset; - cnt += 1; + let idx = key_idx as IdxSize + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { - let hash = hasher.hash_one(k); - let entry = hash_tbl.raw_entry_mut().from_key_hashed_nocheck(hash, &k); - - match entry { - RawEntryMut::Vacant(entry) => { + match hash_tbl.entry(k) { + Entry::Vacant(entry) => { let tuples = unitvec![idx]; - entry.insert_with_hasher(hash, k, (idx, tuples), |k| { - hasher.hash_one(*k) - }); + entry.insert((idx, tuples)); }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); + Entry::Occupied(mut entry) => { + entry.get_mut().1.push(idx); }, } } - }); + } offset += len; } hash_tbl diff --git a/crates/polars-core/src/frame/group_by/proxy.rs b/crates/polars-core/src/frame/group_by/proxy.rs index d9aedc261faf..a97125becf97 100644 --- a/crates/polars-core/src/frame/group_by/proxy.rs +++ b/crates/polars-core/src/frame/group_by/proxy.rs @@ -149,6 +149,9 @@ impl GroupsIdx { } pub fn sort(&mut self) { + if self.sorted { + return; + } let mut idx = 0; let first = std::mem::take(&mut self.first); // store index and values so that we can sort those diff --git a/crates/polars-ops/src/chunked_array/list/sets.rs b/crates/polars-ops/src/chunked_array/list/sets.rs index a26d76b5a768..e06234f4dfcc 100644 --- a/crates/polars-ops/src/chunked_array/list/sets.rs +++ b/crates/polars-ops/src/chunked_array/list/sets.rs @@ -102,7 +102,7 @@ where } } -fn copied_wrapper_opt( +fn copied_wrapper_opt( v: Option<&T>, ) -> as ToTotalOrd>::TotalOrdItem { v.copied().to_total_ord() diff --git a/crates/polars-utils/src/total_ord.rs b/crates/polars-utils/src/total_ord.rs index 80b21bec507a..cfaa05f0141d 100644 --- a/crates/polars-utils/src/total_ord.rs +++ b/crates/polars-utils/src/total_ord.rs @@ -472,7 +472,7 @@ impl<'a> TotalEq for BytesHash<'a> { /// This elides creating a [`TotalOrdWrap`] for types that don't need it. pub trait ToTotalOrd { - type TotalOrdItem; + type TotalOrdItem: Hash + Eq; type SourceItem; fn to_total_ord(&self) -> Self::TotalOrdItem; @@ -564,7 +564,7 @@ impl_to_total_ord_wrapped!(f64); /// `TotalOrdWrap>` implements `Eq + Hash`, iff: /// `Option` implements `TotalEq + TotalHash`, iff: /// `T` implements `TotalEq + TotalHash` -impl ToTotalOrd for Option { +impl ToTotalOrd for Option { type TotalOrdItem = TotalOrdWrap>; type SourceItem = Option; From de6e55545998a58ffbbd7fe1a2135b73d7d78fc5 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 17:11:46 +0200 Subject: [PATCH 2/6] remove unused code --- .../polars-core/src/frame/group_by/proxy.rs | 58 +------------------ 1 file changed, 1 insertion(+), 57 deletions(-) diff --git a/crates/polars-core/src/frame/group_by/proxy.rs b/crates/polars-core/src/frame/group_by/proxy.rs index a97125becf97..d1c04162b7b9 100644 --- a/crates/polars-core/src/frame/group_by/proxy.rs +++ b/crates/polars-core/src/frame/group_by/proxy.rs @@ -3,7 +3,6 @@ use std::ops::Deref; use arrow::offset::OffsetsBuffer; use polars_utils::idx_vec::IdxVec; -use polars_utils::sync::SyncPtr; use rayon::iter::plumbing::UnindexedConsumer; use rayon::prelude::*; @@ -46,61 +45,6 @@ impl From> for GroupsIdx { } } -impl From, Vec)>> for GroupsIdx { - fn from(v: Vec<(Vec, Vec)>) -> Self { - // we have got the hash tables so we can determine the final - let cap = v.iter().map(|v| v.0.len()).sum::(); - let offsets = v - .iter() - .scan(0_usize, |acc, v| { - let out = *acc; - *acc += v.0.len(); - Some(out) - }) - .collect::>(); - let mut global_first = Vec::with_capacity(cap); - let global_first_ptr = unsafe { SyncPtr::new(global_first.as_mut_ptr()) }; - let mut global_all = Vec::with_capacity(cap); - let global_all_ptr = unsafe { SyncPtr::new(global_all.as_mut_ptr()) }; - - POOL.install(|| { - v.into_par_iter().zip(offsets).for_each( - |((local_first_vals, mut local_all_vals), offset)| unsafe { - let global_first: *mut IdxSize = global_first_ptr.get(); - let global_all: *mut IdxVec = global_all_ptr.get(); - let global_first = global_first.add(offset); - let global_all = global_all.add(offset); - - std::ptr::copy_nonoverlapping( - local_first_vals.as_ptr(), - global_first, - local_first_vals.len(), - ); - std::ptr::copy_nonoverlapping( - local_all_vals.as_ptr(), - global_all, - local_all_vals.len(), - ); - // local_all_vals: Vec> - // we just copied the contents: Vec to a new buffer - // now, we want to free the outer vec, without freeing - // the inner vecs as they are moved, so we set the len to 0 - local_all_vals.set_len(0); - }, - ); - }); - unsafe { - global_all.set_len(cap); - global_first.set_len(cap); - } - GroupsIdx { - sorted: false, - first: global_first, - all: global_all, - } - } -} - impl From>> for GroupsIdx { fn from(v: Vec>) -> Self { // single threaded flatten: 10% faster than `iter().flatten().collect() @@ -545,7 +489,7 @@ impl GroupsProxy { } } - pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups { + pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups<'_> { // SAFETY: // we create new `Vec`s from the sliced groups. But we wrap them in ManuallyDrop // so that we never call drop on them. From 007e17605d3488305f06c38747acd26c517e7a1b Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 17:11:53 +0200 Subject: [PATCH 3/6] remove another raw_entry --- .../polars-core/src/frame/group_by/hashing.rs | 43 +++++++------------ 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 701e185a9b15..8207f4e0af7c 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -148,7 +148,7 @@ where (0..n_partitions) .into_par_iter() .map(|thread_no| { - let mut hash_tbl: PlHashMap = + let mut hash_tbl: PlHashMap = PlHashMap::with_capacity(init_size); let mut offset = 0; @@ -163,11 +163,10 @@ where if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { match hash_tbl.entry(k) { Entry::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert((idx, tuples)); + entry.insert(unitvec![idx]); }, Entry::Occupied(mut entry) => { - entry.get_mut().1.push(idx); + entry.get_mut().push(idx); }, } } @@ -176,7 +175,7 @@ where } hash_tbl .into_iter() - .map(|(_k, v)| v) + .map(|(_k, v)| (unsafe { *v.first().unwrap_unchecked() }, v)) .collect_trusted::>() }) .collect::>() @@ -192,8 +191,8 @@ pub(crate) fn group_by_threaded_iter( where I: IntoIterator + Send + Sync + Clone, I::IntoIter: ExactSizeIterator, - T: TotalHash + TotalEq + DirtyHash + ToTotalOrd, - ::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash, + T: ToTotalOrd, + ::TotalOrdItem: Send + Sync + Copy + DirtyHash, { let init_size = get_init_size(); @@ -204,39 +203,29 @@ where (0..n_partitions) .into_par_iter() .map(|thread_no| { - let mut hash_tbl: PlHashMap = + let mut hash_tbl: PlHashMap = PlHashMap::with_capacity(init_size); let mut offset = 0; for keys in keys { let keys = keys.clone().into_iter(); let len = keys.len() as IdxSize; - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - keys.for_each(|k| { + for (key_idx, k) in keys.into_iter().enumerate() { let k = k.to_total_ord(); - let idx = cnt + offset; - cnt += 1; + let idx = key_idx as IdxSize + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { - let hash = hasher.hash_one(k); - let entry = hash_tbl.raw_entry_mut().from_key_hashed_nocheck(hash, &k); - - match entry { - RawEntryMut::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert_with_hasher(hash, k, (idx, tuples), |k| { - hasher.hash_one(*k) - }); + match hash_tbl.entry(k) { + Entry::Vacant(entry) => { + entry.insert(unitvec![idx]); }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); + Entry::Occupied(mut entry) => { + entry.get_mut().push(idx); }, } } - }); + } offset += len; } // iterating the hash tables locally @@ -250,7 +239,7 @@ where // indirection hash_tbl .into_iter() - .map(|(_k, v)| v) + .map(|(_k, v)| (unsafe { *v.first().unwrap_unchecked() }, v)) .collect_trusted::>() }) .collect::>() From 4596ba3c774bebe0f3543f8de1c8021cdb39a210 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 17:15:25 +0200 Subject: [PATCH 4/6] fmt --- crates/polars-core/src/frame/group_by/hashing.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 8207f4e0af7c..b52ed9028b0c 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -1,6 +1,4 @@ -use std::hash::Hash; - -use hashbrown::hash_map::{Entry, RawEntryMut}; +use hashbrown::hash_map::Entry; use polars_utils::hashing::{hash_to_partition, DirtyHash}; use polars_utils::idx_vec::IdxVec; use polars_utils::itertools::Itertools; From 1a24e7ea65e6fd54e9fd83afad880e87550513c9 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 18:28:46 +0200 Subject: [PATCH 5/6] undo separate gather --- .../polars-core/src/frame/group_by/hashing.rs | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index b52ed9028b0c..2f0c409a27ec 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -1,11 +1,9 @@ use hashbrown::hash_map::Entry; use polars_utils::hashing::{hash_to_partition, DirtyHash}; use polars_utils::idx_vec::IdxVec; -use polars_utils::itertools::Itertools; use polars_utils::sync::SyncPtr; use polars_utils::total_ord::{ToTotalOrd, TotalHash, TotalOrdWrap}; use polars_utils::unitvec; -use polars_utils::vec::PushUnchecked; use rayon::prelude::*; use crate::hashing::*; @@ -81,7 +79,8 @@ where let (mut first, mut groups); if sorted { groups = Vec::with_capacity(get_init_size()); - let mut hash_tbl: PlHashMap, IdxSize> = PlHashMap::with_capacity(init_size); + first = Vec::with_capacity(get_init_size()); + let mut hash_tbl = PlHashMap::with_capacity(init_size); for (idx, k) in keys.enumerate() { let idx = idx as IdxSize; match hash_tbl.entry(TotalOrdWrap(k)) { @@ -89,37 +88,25 @@ where let group_idx = groups.len() as IdxSize; entry.insert(group_idx); groups.push(unitvec![idx]); + first.push(idx); }, Entry::Occupied(entry) => unsafe { groups.get_unchecked_mut(*entry.get() as usize).push(idx) }, } } - - first = groups - .iter() - .map(|v| unsafe { *v.first().unwrap_unchecked() }) - .collect_vec(); } else { - let mut hash_tbl: PlHashMap, IdxVec> = PlHashMap::with_capacity(init_size); + let mut hash_tbl = PlHashMap::with_capacity(init_size); for (idx, k) in keys.enumerate() { let idx = idx as IdxSize; match hash_tbl.entry(TotalOrdWrap(k)) { Entry::Vacant(entry) => { - entry.insert(unitvec![idx]); + entry.insert((idx, unitvec![idx])); }, - Entry::Occupied(mut entry) => entry.get_mut().push(idx), - } - } - - first = Vec::with_capacity(hash_tbl.len()); - groups = Vec::with_capacity(hash_tbl.len()); - unsafe { - for v in hash_tbl.into_values() { - first.push_unchecked(*v.first().unwrap_unchecked()); - groups.push_unchecked(v); + Entry::Occupied(mut entry) => entry.get_mut().1.push(idx), } } + (first, groups) = hash_tbl.into_values().unzip(); } GroupsProxy::Idx(GroupsIdx::new(first, groups, sorted)) } @@ -146,8 +133,7 @@ where (0..n_partitions) .into_par_iter() .map(|thread_no| { - let mut hash_tbl: PlHashMap = - PlHashMap::with_capacity(init_size); + let mut hash_tbl = PlHashMap::with_capacity(init_size); let mut offset = 0; for keys in &keys { @@ -161,10 +147,10 @@ where if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { match hash_tbl.entry(k) { Entry::Vacant(entry) => { - entry.insert(unitvec![idx]); + entry.insert((idx, unitvec![idx])); }, Entry::Occupied(mut entry) => { - entry.get_mut().push(idx); + entry.get_mut().1.push(idx); }, } } @@ -173,7 +159,7 @@ where } hash_tbl .into_iter() - .map(|(_k, v)| (unsafe { *v.first().unwrap_unchecked() }, v)) + .map(|(_k, v)| v) .collect_trusted::>() }) .collect::>() From f688dae58b79e3bafcc91a1548405b492db7c420 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 4 Oct 2024 18:33:16 +0200 Subject: [PATCH 6/6] enumerate_idx --- crates/polars-core/src/frame/group_by/hashing.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 2f0c409a27ec..0af7b5159c8a 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -1,6 +1,7 @@ use hashbrown::hash_map::Entry; use polars_utils::hashing::{hash_to_partition, DirtyHash}; use polars_utils::idx_vec::IdxVec; +use polars_utils::itertools::Itertools; use polars_utils::sync::SyncPtr; use polars_utils::total_ord::{ToTotalOrd, TotalHash, TotalOrdWrap}; use polars_utils::unitvec; @@ -81,8 +82,7 @@ where groups = Vec::with_capacity(get_init_size()); first = Vec::with_capacity(get_init_size()); let mut hash_tbl = PlHashMap::with_capacity(init_size); - for (idx, k) in keys.enumerate() { - let idx = idx as IdxSize; + for (idx, k) in keys.enumerate_idx() { match hash_tbl.entry(TotalOrdWrap(k)) { Entry::Vacant(entry) => { let group_idx = groups.len() as IdxSize; @@ -97,8 +97,7 @@ where } } else { let mut hash_tbl = PlHashMap::with_capacity(init_size); - for (idx, k) in keys.enumerate() { - let idx = idx as IdxSize; + for (idx, k) in keys.enumerate_idx() { match hash_tbl.entry(TotalOrdWrap(k)) { Entry::Vacant(entry) => { entry.insert((idx, unitvec![idx])); @@ -140,9 +139,9 @@ where let keys = keys.as_ref(); let len = keys.len() as IdxSize; - for (key_idx, k) in keys.iter().enumerate() { + for (key_idx, k) in keys.iter().enumerate_idx() { let k = k.to_total_ord(); - let idx = key_idx as IdxSize + offset; + let idx = key_idx + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { match hash_tbl.entry(k) { @@ -195,9 +194,9 @@ where let keys = keys.clone().into_iter(); let len = keys.len() as IdxSize; - for (key_idx, k) in keys.into_iter().enumerate() { + for (key_idx, k) in keys.into_iter().enumerate_idx() { let k = k.to_total_ord(); - let idx = key_idx as IdxSize + offset; + let idx = key_idx + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { match hash_tbl.entry(k) {