Skip to content

Commit

Permalink
wip, just some bugs left
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Nov 20, 2024
1 parent 551e0c6 commit 958df17
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 122 deletions.
7 changes: 3 additions & 4 deletions crates/polars-expr/src/chunked_idx_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub trait ChunkedIdxTable: Any + Send + Sync {
fn num_keys(&self) -> IdxSize;

/// Inserts the given key chunk into this ChunkedIdxTable.
fn insert_key_chunk(&mut self, keys: HashKeys);
fn insert_key_chunk(&mut self, keys: HashKeys, track_unmatchable: bool);

/// Probe the table, updating table_match and probe_match with
/// (ChunkId, IdxSize) pairs for each match. Will stop processing new keys
Expand Down Expand Up @@ -58,7 +58,6 @@ pub trait ChunkedIdxTable: Any + Send + Sync {
fn unmarked_keys(&self, out: &mut Vec<ChunkId<32>>, offset: IdxSize, limit: IdxSize);
}

pub fn new_chunked_idx_table(key_schema: Arc<Schema>) -> Box<dyn ChunkedIdxTable> {
// Box::new(row_encoded::BytesIndexMap::new(key_schema))
todo!()
pub fn new_chunked_idx_table(_key_schema: Arc<Schema>) -> Box<dyn ChunkedIdxTable> {
Box::new(row_encoded::RowEncodedChunkedIdxTable::new())
}
44 changes: 32 additions & 12 deletions crates/polars-expr/src/chunked_idx_table/row_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,29 @@ pub struct RowEncodedChunkedIdxTable {
// first chunk in each to mark keys during probing.
idx_map: BytesIndexMap<UnitVec<AtomicU64>>,
chunk_ctr: u32,
null_keys: Vec<ChunkId<32>>,
}

impl RowEncodedChunkedIdxTable {
pub fn new() -> Self {
Self {
idx_map: BytesIndexMap::new(),
chunk_ctr: 0,
null_keys: Vec::new(),
}
}
}

impl RowEncodedChunkedIdxTable {
#[inline(always)]
fn probe_one<const MARK_MATCHES: bool, const EMIT_UNMATCHED: bool>(
fn probe_one<const MARK_MATCHES: bool>(
&self,
hash: u64,
key: &[u8],
key_idx: IdxSize,
table_match: &mut Vec<ChunkId<32>>,
probe_match: &mut Vec<IdxSize>,
) {
) -> bool {
if let Some(chunk_ids) = self.idx_map.get(hash, key) {
for chunk_id in &chunk_ids[..] {
// Create matches, making sure to clear top bit.
Expand All @@ -55,9 +57,9 @@ impl RowEncodedChunkedIdxTable {
first_chunk_id.store(first_chunk_val | (1 << 63), Ordering::Release);
}
}
} else if EMIT_UNMATCHED {
table_match.push(ChunkId::null());
probe_match.push(key_idx);
true
} else {
false
}
}

Expand All @@ -73,14 +75,21 @@ impl RowEncodedChunkedIdxTable {

let mut keys_processed = 0;
for (hash, key) in hash_keys {
if let Some(key) = key {
self.probe_one::<MARK_MATCHES, EMIT_UNMATCHED>(
let found_match = if let Some(key) = key {
self.probe_one::<MARK_MATCHES>(
hash,
key,
keys_processed,
table_match,
probe_match,
);
)
} else {
false
};

if EMIT_UNMATCHED && !found_match {
table_match.push(ChunkId::null());
probe_match.push(keys_processed);
}

keys_processed += 1;
Expand Down Expand Up @@ -130,7 +139,7 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable {
self.idx_map.len()
}

fn insert_key_chunk(&mut self, hash_keys: HashKeys) {
fn insert_key_chunk(&mut self, hash_keys: HashKeys, track_unmatchable: bool) {
let HashKeys::RowEncoded(hash_keys) = hash_keys else {
unreachable!()
};
Expand All @@ -144,9 +153,9 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable {
.zip(hash_keys.keys.iter())
.enumerate_idx()
{
let chunk_id = ChunkId::<32>::store(self.chunk_ctr as IdxSize, i);
if let Some(key) = key {
let chunk_id =
AtomicU64::new(ChunkId::<32>::store(self.chunk_ctr as IdxSize, i).into_inner());
let chunk_id = AtomicU64::new(chunk_id.into_inner());
match self.idx_map.entry(*hash, key) {
Entry::Occupied(o) => {
o.into_mut().push(chunk_id);
Expand All @@ -155,6 +164,8 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable {
v.insert(unitvec![chunk_id]);
},
}
} else if track_unmatchable {
self.null_keys.push(chunk_id);
}
}

Expand Down Expand Up @@ -252,9 +263,16 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable {
}
}

fn unmarked_keys(&self, out: &mut Vec<ChunkId<32>>, offset: IdxSize, limit: IdxSize) {
fn unmarked_keys(&self, out: &mut Vec<ChunkId<32>>, mut offset: IdxSize, limit: IdxSize) {
out.clear();

if (offset as usize) < self.null_keys.len() {
out.extend(self.null_keys[offset as usize..].iter().copied().take(limit as usize));
return;
}

offset -= self.null_keys.len() as IdxSize;

while let Some((_, _, chunk_ids)) = self.idx_map.get_index(offset) {
let first_chunk_id = unsafe { chunk_ids.get_unchecked(0) };
let first_chunk_val = first_chunk_id.load(Ordering::Acquire);
Expand All @@ -269,6 +287,8 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable {
if out.len() >= limit as usize {
break;
}

offset += 1;
}
}
}
24 changes: 14 additions & 10 deletions crates/polars-expr/src/hash_keys.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use arrow::array::{BinaryArray, PrimitiveArray, UInt64Array};
use arrow::compute::take::binary::take_unchecked;
use arrow::compute::utils::combine_validities_and_many;
Expand Down Expand Up @@ -63,23 +61,22 @@ impl HashKeys {
/// After this call partition_idxs[p] will contain the indices of hashes
/// that belong to partition p, and the cardinality sketches are updated
/// accordingly.
///
/// If null_is_valid is false rows with nulls do not get assigned a partition.
pub fn gen_partition_idxs(
&self,
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
sketches: &mut [CardinalitySketch],
partition_nulls: bool,
) {
if sketches.is_empty() {
match self {
Self::RowEncoded(s) => s.gen_partition_idxs::<false>(partitioner, partition_idxs, sketches),
Self::Single(s) => s.gen_partition_idxs::<false>(partitioner, partition_idxs, sketches),
Self::RowEncoded(s) => s.gen_partition_idxs::<false>(partitioner, partition_idxs, sketches, partition_nulls),
Self::Single(s) => s.gen_partition_idxs::<false>(partitioner, partition_idxs, sketches, partition_nulls),
}
} else {
match self {
Self::RowEncoded(s) => s.gen_partition_idxs::<true>(partitioner, partition_idxs, sketches),
Self::Single(s) => s.gen_partition_idxs::<true>(partitioner, partition_idxs, sketches),
Self::RowEncoded(s) => s.gen_partition_idxs::<true>(partitioner, partition_idxs, sketches, partition_nulls),
Self::Single(s) => s.gen_partition_idxs::<true>(partitioner, partition_idxs, sketches, partition_nulls),
}
}
}
Expand All @@ -106,9 +103,10 @@ impl RowEncodedKeys {
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
sketches: &mut [CardinalitySketch],
partition_nulls: bool,
) {
assert!(partition_idxs.len() == partitioner.num_partitions());
assert!(BUILD_SKETCHES && sketches.len() == partitioner.num_partitions());
assert!(!BUILD_SKETCHES || sketches.len() == partitioner.num_partitions());
for p in partition_idxs.iter_mut() {
p.clear();
}
Expand All @@ -124,6 +122,11 @@ impl RowEncodedKeys {
sketches.get_unchecked_mut(p).insert(*h);
}
}
} else if partition_nulls {
// Arbitrarily put nulls in partition 0.
unsafe {
partition_idxs.get_unchecked_mut(0).push(i as IdxSize);
}
}
}
} else {
Expand Down Expand Up @@ -167,7 +170,8 @@ impl SingleKeys {
&self,
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
sketches: &mut [CardinalitySketch],
_sketches: &mut [CardinalitySketch],
_partition_nulls: bool,
) {
assert!(partitioner.num_partitions() == partition_idxs.len());
for p in partition_idxs.iter_mut() {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ impl Debug for JoinType {
}

impl JoinType {
pub fn is_equi(&self) -> bool {
matches!(self, JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full)
}

pub fn is_asof(&self) -> bool {
#[cfg(feature = "asof_join")]
{
Expand Down
Loading

0 comments on commit 958df17

Please sign in to comment.