diff --git a/crates/polars-arrow/src/compute/take/generic_binary.rs b/crates/polars-arrow/src/compute/take/generic_binary.rs index dc281bda129a..a1c220ddbd29 100644 --- a/crates/polars-arrow/src/compute/take/generic_binary.rs +++ b/crates/polars-arrow/src/compute/take/generic_binary.rs @@ -73,7 +73,7 @@ pub(super) unsafe fn take_values_validity( values: &[u8], indices: &PrimitiveArray, ) -> (OffsetsBuffer, Buffer, Option) { - let mut length = O::default(); + let mut total_length = O::default(); let offsets = offsets.buffer(); let mut starts = Vec::::with_capacity(indices.len()); let lengths = indices.values().iter().map(|index| { let index = index.to_usize(); + let length; match offsets.get(index + 1) { Some(&next) => { let start = *offsets.get_unchecked(index); - length += next - start; + length = next - start; + total_length += length; starts.push_unchecked(start); }, - None => starts.push_unchecked(O::default()), + None => { + length = O::zero(); + starts.push_unchecked(O::default()); + }, }; length.to_usize() }); let offsets = create_offsets(lengths, indices.len()); - let buffer = take_values(length, &starts, &offsets, values); + let buffer = take_values(total_length, &starts, &offsets, values); (offsets, buffer, indices.validity().cloned()) } @@ -127,7 +133,7 @@ pub(super) unsafe fn take_values_indices_validity, ) -> (OffsetsBuffer, Buffer, Option) { - let mut length = O::default(); + let mut total_length = O::default(); let mut validity = MutableBitmap::with_capacity(indices.len()); let values_validity = values.validity().unwrap(); @@ -136,28 +142,32 @@ pub(super) unsafe fn take_values_indices_validity::with_capacity(indices.len()); let lengths = indices.iter().map(|index| { + let length; match index { Some(index) => { let index = index.to_usize(); if values_validity.get_bit(index) { validity.push(true); - length += *offsets.get_unchecked(index + 1) - *offsets.get_unchecked(index); + length = *offsets.get_unchecked(index + 1) - *offsets.get_unchecked(index); starts.push_unchecked(*offsets.get_unchecked(index)); } else { validity.push(false); + length = O::zero(); starts.push_unchecked(O::default()); } }, None => { validity.push(false); + length = O::zero(); starts.push_unchecked(O::default()); }, }; + total_length += length; length.to_usize() }); let offsets = create_offsets(lengths, indices.len()); - let buffer = take_values(length, &starts, &offsets, values_values); + let buffer = take_values(total_length, &starts, &offsets, values_values); (offsets, buffer, validity.into()) } diff --git a/crates/polars-core/src/datatypes/field.rs b/crates/polars-core/src/datatypes/field.rs index b85caeec0a2e..0c30aacdd275 100644 --- a/crates/polars-core/src/datatypes/field.rs +++ b/crates/polars-core/src/datatypes/field.rs @@ -97,6 +97,12 @@ impl Field { self.name = name; } + /// Returns this `Field`, renamed. + pub fn with_name(mut self, name: PlSmallStr) -> Self { + self.name = name; + self + } + /// Converts the `Field` to an `arrow::datatypes::Field`. /// /// # Example diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index da29d8da070b..c86c6d0ca2d0 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -380,6 +380,15 @@ impl DataFrame { unsafe { DataFrame::new_no_checks(0, cols) } } + /// Create a new `DataFrame` with the given schema, only containing nulls. + pub fn full_null(schema: &Schema, height: usize) -> Self { + let columns = schema + .iter_fields() + .map(|f| Column::full_null(f.name.clone(), height, f.dtype())) + .collect(); + DataFrame { height, columns } + } + /// Removes the last `Series` from the `DataFrame` and returns it, or [`None`] if it is empty. /// /// # Example @@ -713,7 +722,7 @@ impl DataFrame { /// - The length of each appended column matches the height of the [`DataFrame`]. For /// `DataFrame`]s with no columns (ZCDFs), it is important that the height is set afterwards /// with [`DataFrame::set_height`]. - pub unsafe fn column_extend_unchecked(&mut self, iter: impl Iterator) { + pub unsafe fn column_extend_unchecked(&mut self, iter: impl IntoIterator) { unsafe { self.get_columns_mut() }.extend(iter) } @@ -1894,11 +1903,15 @@ impl DataFrame { unsafe { DataFrame::new_no_checks(idx.len(), cols) } } - pub(crate) unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self { + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self { self.take_slice_unchecked_impl(idx, true) } - unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self { + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self { let cols = if allow_threads { POOL.install(|| self._apply_columns_par(&|s| s.take_slice_unchecked(idx))) } else { diff --git a/crates/polars-expr/src/chunked_idx_table/mod.rs b/crates/polars-expr/src/chunked_idx_table/mod.rs new file mode 100644 index 000000000000..948e34effad0 --- /dev/null +++ b/crates/polars-expr/src/chunked_idx_table/mod.rs @@ -0,0 +1,65 @@ +use std::any::Any; + +use polars_core::prelude::*; +use polars_utils::index::ChunkId; +use polars_utils::IdxSize; + +use crate::hash_keys::HashKeys; + +mod row_encoded; + +pub trait ChunkedIdxTable: Any + Send + Sync { + /// Creates a new empty ChunkedIdxTable similar to this one. + fn new_empty(&self) -> Box; + + /// Reserves space for the given number additional keys. + fn reserve(&mut self, additional: usize); + + /// Returns the number of unique keys in this ChunkedIdxTable. + fn num_keys(&self) -> IdxSize; + + /// Inserts the given key chunk into this ChunkedIdxTable. + fn insert_key_chunk(&mut self, keys: HashKeys, track_unmatchable: bool); + + /// Probe the table, updating table_match and probe_match with + /// (ChunkId, IdxSize) pairs for each match. Will stop processing new keys + /// once limit matches have been generated, returning the number of keys + /// processed. + /// + /// If mark_matches is true, matches are marked in the table as such. + /// + /// If emit_unmatched is true, for keys that do not have a match we emit a + /// match with ChunkId::null() on the table match. + fn probe( + &self, + hash_keys: &HashKeys, + table_match: &mut Vec>, + probe_match: &mut Vec, + mark_matches: bool, + emit_unmatched: bool, + limit: IdxSize, + ) -> IdxSize; + + /// The same as probe, except it will only apply to the specified subset of keys. + /// # Safety + /// The provided subset indices must be in-bounds. + #[allow(clippy::too_many_arguments)] + unsafe fn probe_subset( + &self, + hash_keys: &HashKeys, + subset: &[IdxSize], + table_match: &mut Vec>, + probe_match: &mut Vec, + mark_matches: bool, + emit_unmatched: bool, + limit: IdxSize, + ) -> IdxSize; + + /// Get the ChunkIds for each key which was never marked during probing. + fn unmarked_keys(&self, out: &mut Vec>, offset: IdxSize, limit: IdxSize) + -> IdxSize; +} + +pub fn new_chunked_idx_table(_key_schema: Arc) -> Box { + Box::new(row_encoded::RowEncodedChunkedIdxTable::new()) +} diff --git a/crates/polars-expr/src/chunked_idx_table/row_encoded.rs b/crates/polars-expr/src/chunked_idx_table/row_encoded.rs new file mode 100644 index 000000000000..fc67aca159a9 --- /dev/null +++ b/crates/polars-expr/src/chunked_idx_table/row_encoded.rs @@ -0,0 +1,306 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use arrow::array::Array; +use polars_utils::idx_map::bytes_idx_map::{BytesIndexMap, Entry}; +use polars_utils::idx_vec::UnitVec; +use polars_utils::itertools::Itertools; +use polars_utils::unitvec; + +use super::*; +use crate::hash_keys::HashKeys; + +#[derive(Default)] +pub struct RowEncodedChunkedIdxTable { + // These AtomicU64s actually are ChunkIds, but we use the top bit of the + // first chunk in each to mark keys during probing. + idx_map: BytesIndexMap>, + chunk_ctr: u32, + null_keys: Vec>, +} + +impl RowEncodedChunkedIdxTable { + pub fn new() -> Self { + Self { + idx_map: BytesIndexMap::new(), + chunk_ctr: 0, + null_keys: Vec::new(), + } + } +} + +impl RowEncodedChunkedIdxTable { + #[inline(always)] + fn probe_one( + &self, + key_idx: IdxSize, + hash: u64, + key: &[u8], + table_match: &mut Vec>, + probe_match: &mut Vec, + ) -> bool { + if let Some(chunk_ids) = self.idx_map.get(hash, key) { + for chunk_id in &chunk_ids[..] { + // Create matches, making sure to clear top bit. + let raw_chunk_id = chunk_id.load(Ordering::Relaxed); + let chunk_id = ChunkId::from_inner(raw_chunk_id & !(1 << 63)); + table_match.push(chunk_id); + probe_match.push(key_idx); + } + + // Mark if necessary. This action is idempotent so doesn't + // need any synchronization on the load, nor does it need a + // fetch_or to do it atomically. + if MARK_MATCHES { + let first_chunk_id = unsafe { chunk_ids.get_unchecked(0) }; + let first_chunk_val = first_chunk_id.load(Ordering::Relaxed); + if first_chunk_val >> 63 == 0 { + first_chunk_id.store(first_chunk_val | (1 << 63), Ordering::Release); + } + } + true + } else { + false + } + } + + fn probe_impl<'a, const MARK_MATCHES: bool, const EMIT_UNMATCHED: bool>( + &self, + hash_keys: impl Iterator)>, + table_match: &mut Vec>, + probe_match: &mut Vec, + limit: IdxSize, + ) -> IdxSize { + table_match.clear(); + probe_match.clear(); + + let mut keys_processed = 0; + for (key_idx, hash, key) in hash_keys { + let found_match = if let Some(key) = key { + self.probe_one::(key_idx, hash, key, table_match, probe_match) + } else { + false + }; + + if EMIT_UNMATCHED && !found_match { + table_match.push(ChunkId::null()); + probe_match.push(key_idx); + } + + keys_processed += 1; + if table_match.len() >= limit as usize { + break; + } + } + keys_processed + } + + fn probe_dispatch<'a>( + &self, + hash_keys: impl Iterator)>, + table_match: &mut Vec>, + probe_match: &mut Vec, + mark_matches: bool, + emit_unmatched: bool, + limit: IdxSize, + ) -> IdxSize { + match (mark_matches, emit_unmatched) { + (false, false) => { + self.probe_impl::(hash_keys, table_match, probe_match, limit) + }, + (false, true) => { + self.probe_impl::(hash_keys, table_match, probe_match, limit) + }, + (true, false) => { + self.probe_impl::(hash_keys, table_match, probe_match, limit) + }, + (true, true) => { + self.probe_impl::(hash_keys, table_match, probe_match, limit) + }, + } + } +} + +impl ChunkedIdxTable for RowEncodedChunkedIdxTable { + fn new_empty(&self) -> Box { + Box::new(Self::new()) + } + + fn reserve(&mut self, additional: usize) { + self.idx_map.reserve(additional); + } + + fn num_keys(&self) -> IdxSize { + self.idx_map.len() + } + + fn insert_key_chunk(&mut self, hash_keys: HashKeys, track_unmatchable: bool) { + let HashKeys::RowEncoded(hash_keys) = hash_keys else { + unreachable!() + }; + if hash_keys.keys.len() >= 1 << 31 { + panic!("overly large chunk in RowEncodedChunkedIdxTable"); + } + + for (i, (hash, key)) in hash_keys + .hashes + .values_iter() + .zip(hash_keys.keys.iter()) + .enumerate_idx() + { + let chunk_id = ChunkId::<32>::store(self.chunk_ctr as IdxSize, i); + if let Some(key) = key { + let chunk_id = AtomicU64::new(chunk_id.into_inner()); + match self.idx_map.entry(*hash, key) { + Entry::Occupied(o) => { + o.into_mut().push(chunk_id); + }, + Entry::Vacant(v) => { + v.insert(unitvec![chunk_id]); + }, + } + } else if track_unmatchable { + self.null_keys.push(chunk_id); + } + } + + self.chunk_ctr = self.chunk_ctr.checked_add(1).unwrap(); + } + + fn probe( + &self, + hash_keys: &HashKeys, + table_match: &mut Vec>, + probe_match: &mut Vec, + mark_matches: bool, + emit_unmatched: bool, + limit: IdxSize, + ) -> IdxSize { + let HashKeys::RowEncoded(hash_keys) = hash_keys else { + unreachable!() + }; + + if hash_keys.keys.has_nulls() { + let iter = hash_keys + .hashes + .values_iter() + .copied() + .zip(hash_keys.keys.iter()) + .enumerate_idx() + .map(|(i, (h, k))| (i, h, k)); + self.probe_dispatch( + iter, + table_match, + probe_match, + mark_matches, + emit_unmatched, + limit, + ) + } else { + let iter = hash_keys + .hashes + .values_iter() + .copied() + .zip(hash_keys.keys.values_iter().map(Some)) + .enumerate_idx() + .map(|(i, (h, k))| (i, h, k)); + self.probe_dispatch( + iter, + table_match, + probe_match, + mark_matches, + emit_unmatched, + limit, + ) + } + } + + unsafe fn probe_subset( + &self, + hash_keys: &HashKeys, + subset: &[IdxSize], + table_match: &mut Vec>, + probe_match: &mut Vec, + mark_matches: bool, + emit_unmatched: bool, + limit: IdxSize, + ) -> IdxSize { + let HashKeys::RowEncoded(hash_keys) = hash_keys else { + unreachable!() + }; + + if hash_keys.keys.has_nulls() { + let iter = subset.iter().map(|i| { + ( + *i, + hash_keys.hashes.value_unchecked(*i as usize), + hash_keys.keys.get_unchecked(*i as usize), + ) + }); + self.probe_dispatch( + iter, + table_match, + probe_match, + mark_matches, + emit_unmatched, + limit, + ) + } else { + let iter = subset.iter().map(|i| { + ( + *i, + hash_keys.hashes.value_unchecked(*i as usize), + Some(hash_keys.keys.value_unchecked(*i as usize)), + ) + }); + self.probe_dispatch( + iter, + table_match, + probe_match, + mark_matches, + emit_unmatched, + limit, + ) + } + } + + fn unmarked_keys( + &self, + out: &mut Vec>, + mut offset: IdxSize, + limit: IdxSize, + ) -> IdxSize { + out.clear(); + + if (offset as usize) < self.null_keys.len() { + out.extend( + self.null_keys[offset as usize..] + .iter() + .copied() + .take(limit as usize), + ); + return out.len() as IdxSize; + } + + offset -= self.null_keys.len() as IdxSize; + + let mut keys_processed = 0; + while let Some((_, _, chunk_ids)) = self.idx_map.get_index(offset + keys_processed) { + let first_chunk_id = unsafe { chunk_ids.get_unchecked(0) }; + let first_chunk_val = first_chunk_id.load(Ordering::Acquire); + if first_chunk_val >> 63 == 0 { + for chunk_id in &chunk_ids[..] { + let raw_chunk_id = chunk_id.load(Ordering::Relaxed); + let chunk_id = ChunkId::from_inner(raw_chunk_id & !(1 << 63)); + out.push(chunk_id); + } + } + + keys_processed += 1; + if out.len() >= limit as usize { + break; + } + } + + keys_processed + } +} diff --git a/crates/polars-expr/src/groups/mod.rs b/crates/polars-expr/src/groups/mod.rs index 2938536a729e..42d259de7fd8 100644 --- a/crates/polars-expr/src/groups/mod.rs +++ b/crates/polars-expr/src/groups/mod.rs @@ -15,7 +15,7 @@ pub trait Grouper: Any + Send + Sync { /// Creates a new empty Grouper similar to this one. fn new_empty(&self) -> Box; - /// Reserves space for the given number additional of groups. + /// Reserves space for the given number additional groups. fn reserve(&mut self, additional: usize); /// Returns the number of groups in this Grouper. diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 8c25e1ff08f1..05b28ed45548 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -1,3 +1,4 @@ +use arrow::array::Array; use polars_row::EncodingField; use polars_utils::cardinality_sketch::CardinalitySketch; use polars_utils::idx_map::bytes_idx_map::{BytesIndexMap, Entry}; @@ -73,9 +74,12 @@ impl Grouper for RowEncodedHashGrouper { let HashKeys::RowEncoded(keys) = keys else { unreachable!() }; + assert!(!keys.hashes.has_nulls()); + assert!(!keys.keys.has_nulls()); + group_idxs.clear(); group_idxs.reserve(keys.hashes.len()); - for (hash, key) in keys.hashes.iter().zip(keys.keys.values_iter()) { + for (hash, key) in keys.hashes.values_iter().zip(keys.keys.values_iter()) { unsafe { group_idxs.push_unchecked(self.insert_key(*hash, key)); } diff --git a/crates/polars-expr/src/hash_keys.rs b/crates/polars-expr/src/hash_keys.rs index d5f85ef49db7..602ca7c52216 100644 --- a/crates/polars-expr/src/hash_keys.rs +++ b/crates/polars-expr/src/hash_keys.rs @@ -1,10 +1,13 @@ -use arrow::array::BinaryArray; +use arrow::array::{BinaryArray, PrimitiveArray, UInt64Array}; use arrow::compute::take::binary::take_unchecked; +use arrow::compute::utils::combine_validities_and_many; use polars_core::frame::DataFrame; use polars_core::prelude::row_encode::_get_rows_encoded_unordered; use polars_core::prelude::PlRandomState; use polars_core::series::Series; +use polars_utils::cardinality_sketch::CardinalitySketch; use polars_utils::hashing::HashPartitioner; +use polars_utils::index::ChunkId; use polars_utils::itertools::Itertools; use polars_utils::vec::PushUnchecked; use polars_utils::IdxSize; @@ -12,21 +15,32 @@ use polars_utils::IdxSize; /// Represents a DataFrame plus a hash per row, intended for keys in grouping /// or joining. The hashes may or may not actually be physically pre-computed, /// this depends per type. +#[derive(Clone)] pub enum HashKeys { RowEncoded(RowEncodedKeys), Single(SingleKeys), } impl HashKeys { - pub fn from_df(df: &DataFrame, random_state: PlRandomState, force_row_encoding: bool) -> Self { + pub fn from_df( + df: &DataFrame, + random_state: PlRandomState, + null_is_valid: bool, + force_row_encoding: bool, + ) -> Self { if df.width() > 1 || force_row_encoding { let keys = df .get_columns() .iter() .map(|c| c.as_materialized_series().clone()) .collect_vec(); - let keys_encoded = _get_rows_encoded_unordered(&keys[..]).unwrap().into_array(); - assert!(keys_encoded.len() == df.height()); + let mut keys_encoded = _get_rows_encoded_unordered(&keys[..]).unwrap().into_array(); + + if !null_is_valid { + let validities = keys.iter().map(|c| c.rechunk_validity()).collect_vec(); + let combined = combine_validities_and_many(&validities); + keys_encoded.set_validity(combined); + } // TODO: use vechash? Not supported yet for lists. // let mut hashes = Vec::with_capacity(df.height()); @@ -37,7 +51,7 @@ impl HashKeys { .map(|k| random_state.hash_one(k)) .collect(); Self::RowEncoded(RowEncodedKeys { - hashes, + hashes: PrimitiveArray::from_vec(hashes), keys: keys_encoded, }) } else { @@ -50,14 +64,75 @@ impl HashKeys { } } + pub fn len(&self) -> usize { + match self { + HashKeys::RowEncoded(s) => s.keys.len(), + HashKeys::Single(s) => s.keys.len(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// After this call partition_idxs[p] will contain the indices of hashes + /// that belong to partition p, and the cardinality sketches are updated + /// accordingly. pub fn gen_partition_idxs( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], + sketches: &mut [CardinalitySketch], + partition_nulls: bool, + ) { + if sketches.is_empty() { + match self { + Self::RowEncoded(s) => s.gen_partition_idxs::( + partitioner, + partition_idxs, + sketches, + partition_nulls, + ), + Self::Single(s) => s.gen_partition_idxs::( + partitioner, + partition_idxs, + sketches, + partition_nulls, + ), + } + } else { + match self { + Self::RowEncoded(s) => s.gen_partition_idxs::( + partitioner, + partition_idxs, + sketches, + partition_nulls, + ), + Self::Single(s) => s.gen_partition_idxs::( + partitioner, + partition_idxs, + sketches, + partition_nulls, + ), + } + } + } + + /// Generates indices for a chunked gather such that the ith key gathers + /// the next gathers_per_key[i] elements from the partition[i]th chunk. + pub fn gen_partitioned_gather_idxs( + &self, + partitioner: &HashPartitioner, + gathers_per_key: &[IdxSize], + gather_idxs: &mut Vec>, ) { match self { - Self::RowEncoded(s) => s.gen_partition_idxs(partitioner, partition_idxs), - Self::Single(s) => s.gen_partition_idxs(partitioner, partition_idxs), + Self::RowEncoded(s) => { + s.gen_partitioned_gather_idxs(partitioner, gathers_per_key, gather_idxs) + }, + Self::Single(s) => { + s.gen_partitioned_gather_idxs(partitioner, gathers_per_key, gather_idxs) + }, } } @@ -71,23 +146,74 @@ impl HashKeys { } } +#[derive(Clone)] pub struct RowEncodedKeys { - pub hashes: Vec, + pub hashes: UInt64Array, pub keys: BinaryArray, } impl RowEncodedKeys { - pub fn gen_partition_idxs( + pub fn gen_partition_idxs( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], + sketches: &mut [CardinalitySketch], + partition_nulls: bool, ) { - assert!(partitioner.num_partitions() == partition_idxs.len()); - for (i, h) in self.hashes.iter().enumerate() { - unsafe { - // SAFETY: we assured the number of partitions matches. - let p = partitioner.hash_to_partition(*h); - partition_idxs.get_unchecked_mut(p).push(i as IdxSize); + assert!(partition_idxs.len() == partitioner.num_partitions()); + assert!(!BUILD_SKETCHES || sketches.len() == partitioner.num_partitions()); + for p in partition_idxs.iter_mut() { + p.clear(); + } + + if let Some(validity) = self.keys.validity() { + for (i, (h, is_v)) in self.hashes.values_iter().zip(validity).enumerate() { + if is_v { + unsafe { + // SAFETY: we assured the number of partitions matches. + let p = partitioner.hash_to_partition(*h); + partition_idxs.get_unchecked_mut(p).push(i as IdxSize); + if BUILD_SKETCHES { + sketches.get_unchecked_mut(p).insert(*h); + } + } + } else if partition_nulls { + // Arbitrarily put nulls in partition 0. + unsafe { + partition_idxs.get_unchecked_mut(0).push(i as IdxSize); + } + } + } + } else { + for (i, h) in self.hashes.values_iter().enumerate() { + unsafe { + // SAFETY: we assured the number of partitions matches. + let p = partitioner.hash_to_partition(*h); + partition_idxs.get_unchecked_mut(p).push(i as IdxSize); + if BUILD_SKETCHES { + sketches.get_unchecked_mut(p).insert(*h); + } + } + } + } + } + + pub fn gen_partitioned_gather_idxs( + &self, + partitioner: &HashPartitioner, + gathers_per_key: &[IdxSize], + gather_idxs: &mut Vec>, + ) { + assert!(gathers_per_key.len() == self.keys.len()); + unsafe { + let mut offsets = vec![0; partitioner.num_partitions()]; + for (hash, &n) in self.hashes.values_iter().zip(gathers_per_key) { + let p = partitioner.hash_to_partition(*hash); + let offset = *offsets.get_unchecked(p); + for i in offset..offset + n { + gather_idxs.push(ChunkId::store(p as IdxSize, i)); + } + *offsets.get_unchecked_mut(p) += n; } } } @@ -97,16 +223,20 @@ impl RowEncodedKeys { pub unsafe fn gather(&self, idxs: &[IdxSize]) -> Self { let mut hashes = Vec::with_capacity(idxs.len()); for idx in idxs { - hashes.push_unchecked(*self.hashes.get_unchecked(*idx as usize)); + hashes.push_unchecked(*self.hashes.values().get_unchecked(*idx as usize)); } let idx_arr = arrow::ffi::mmap::slice(idxs); let keys = take_unchecked(&self.keys, &idx_arr); - Self { hashes, keys } + Self { + hashes: PrimitiveArray::from_vec(hashes), + keys, + } } } /// Single keys. Does not pre-hash for boolean & integer types, only for strings /// and nested types. +#[derive(Clone)] pub struct SingleKeys { pub random_state: PlRandomState, pub hashes: Option>, @@ -114,12 +244,28 @@ pub struct SingleKeys { } impl SingleKeys { - pub fn gen_partition_idxs( + pub fn gen_partition_idxs( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], + _sketches: &mut [CardinalitySketch], + _partition_nulls: bool, ) { assert!(partitioner.num_partitions() == partition_idxs.len()); + for p in partition_idxs.iter_mut() { + p.clear(); + } + + todo!() + } + + #[allow(clippy::ptr_arg)] // Remove when implemented. + pub fn gen_partitioned_gather_idxs( + &self, + _partitioner: &HashPartitioner, + _gathers_per_key: &[IdxSize], + _gather_idxs: &mut Vec>, + ) { todo!() } diff --git a/crates/polars-expr/src/lib.rs b/crates/polars-expr/src/lib.rs index 0a7e7b20bfe2..138068e3c268 100644 --- a/crates/polars-expr/src/lib.rs +++ b/crates/polars-expr/src/lib.rs @@ -1,3 +1,4 @@ +pub mod chunked_idx_table; mod expressions; pub mod groups; pub mod hash_keys; diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 3f8c64dd1970..be992164a4d9 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -161,7 +161,7 @@ dtype-time = [ dtype-u16 = ["polars-plan/dtype-u16", "polars-pipe?/dtype-u16", "polars-expr/dtype-u16", "polars-mem-engine/dtype-u16"] dtype-u8 = ["polars-plan/dtype-u8", "polars-pipe?/dtype-u8", "polars-expr/dtype-u8", "polars-mem-engine/dtype-u8"] -object = ["polars-plan/object", "polars-mem-engine/object"] +object = ["polars-plan/object", "polars-mem-engine/object", "polars-stream?/object"] month_start = ["polars-plan/month_start"] month_end = ["polars-plan/month_end"] offset_by = ["polars-plan/offset_by"] diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index def36b76a677..5d55367b1f4a 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -163,6 +163,13 @@ impl Debug for JoinType { } impl JoinType { + pub fn is_equi(&self) -> bool { + matches!( + self, + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full + ) + } + pub fn is_asof(&self) -> bool { #[cfg(feature = "asof_join")] { diff --git a/crates/polars-stream/Cargo.toml b/crates/polars-stream/Cargo.toml index f0b3b1c30e35..d90047b70074 100644 --- a/crates/polars-stream/Cargo.toml +++ b/crates/polars-stream/Cargo.toml @@ -46,6 +46,7 @@ parquet = ["polars-mem-engine/parquet", "polars-plan/parquet"] csv = ["polars-mem-engine/csv", "polars-plan/csv"] json = ["polars-mem-engine/json", "polars-plan/json"] cloud = ["polars-mem-engine/cloud", "polars-plan/cloud", "polars-io/cloud"] +object = ["polars-ops/object"] # We need to specify default features here to match workspace defaults. # Otherwise we get warnings with cargo check/clippy. diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index 5784c060384f..c08284d9f009 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -77,7 +77,7 @@ impl GroupBySinkState { key_columns.push(s.into_column()); } let keys = DataFrame::new_with_broadcast_len(key_columns, df.height())?; - let hash_keys = HashKeys::from_df(&keys, random_state.clone(), true); + let hash_keys = HashKeys::from_df(&keys, random_state.clone(), true, true); local.grouper.insert_keys(hash_keys, &mut group_idxs); // Update reductions. @@ -221,7 +221,7 @@ impl GroupBySinkState { Self::combine_locals_parallel(num_partitions, output_schema, self.local) }; - let mut source_node = InMemorySourceNode::new(Arc::new(df?)); + let mut source_node = InMemorySourceNode::new(Arc::new(df?), MorselSeq::default()); source_node.initialize(num_pipelines); Ok(source_node) } diff --git a/crates/polars-stream/src/nodes/in_memory_map.rs b/crates/polars-stream/src/nodes/in_memory_map.rs index 27af6be9aa87..bce07359ec10 100644 --- a/crates/polars-stream/src/nodes/in_memory_map.rs +++ b/crates/polars-stream/src/nodes/in_memory_map.rs @@ -56,7 +56,10 @@ impl ComputeNode for InMemoryMapNode { { if recv[0] == PortState::Done { let df = sink_node.get_output()?; - let mut source_node = InMemorySourceNode::new(Arc::new(map.call_udf(df.unwrap())?)); + let mut source_node = InMemorySourceNode::new( + Arc::new(map.call_udf(df.unwrap())?), + MorselSeq::default(), + ); source_node.initialize(*num_pipelines); *self = Self::Source(source_node); } diff --git a/crates/polars-stream/src/nodes/in_memory_source.rs b/crates/polars-stream/src/nodes/in_memory_source.rs index ab3231b1c759..5c9f1e63fbce 100644 --- a/crates/polars-stream/src/nodes/in_memory_source.rs +++ b/crates/polars-stream/src/nodes/in_memory_source.rs @@ -9,14 +9,16 @@ pub struct InMemorySourceNode { source: Option>, morsel_size: usize, seq: AtomicU64, + seq_offset: MorselSeq, } impl InMemorySourceNode { - pub fn new(source: Arc) -> Self { + pub fn new(source: Arc, seq_offset: MorselSeq) -> Self { InMemorySourceNode { source: Some(source), morsel_size: 0, seq: AtomicU64::new(0), + seq_offset, } } } @@ -87,7 +89,8 @@ impl ComputeNode for InMemorySourceNode { break; } - let mut morsel = Morsel::new(df, MorselSeq::new(seq), source_token.clone()); + let morsel_seq = MorselSeq::new(seq).offset_by(slf.seq_offset); + let mut morsel = Morsel::new(df, morsel_seq, source_token.clone()); morsel.set_consume_token(wait_group.token()); if send.send(morsel).await.is_err() { break; diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs new file mode 100644 index 000000000000..fc2d69ffb742 --- /dev/null +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -0,0 +1,880 @@ +use std::sync::Arc; + +use polars_core::prelude::*; +use polars_core::schema::{Schema, SchemaExt}; +use polars_core::series::IsSorted; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; +use polars_expr::chunked_idx_table::{new_chunked_idx_table, ChunkedIdxTable}; +use polars_expr::hash_keys::HashKeys; +use polars_ops::frame::{JoinArgs, JoinType}; +use polars_ops::prelude::TakeChunked; +use polars_ops::series::coalesce_columns; +use polars_utils::cardinality_sketch::CardinalitySketch; +use polars_utils::hashing::HashPartitioner; +use polars_utils::itertools::Itertools; +use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, IdxSize}; +use rayon::prelude::*; + +use crate::async_primitives::connector::{Receiver, Sender}; +use crate::async_primitives::wait_group::WaitGroup; +use crate::expression::StreamExpr; +use crate::morsel::{get_ideal_morsel_size, SourceToken}; +use crate::nodes::compute_node_prelude::*; +use crate::nodes::in_memory_source::InMemorySourceNode; + +/// A payload selector contains for each column whether that column should be +/// included in the payload, and if yes with what name. +fn compute_payload_selector( + this: &Schema, + other: &Schema, + this_key_schema: &Schema, + is_left: bool, + args: &JoinArgs, +) -> PolarsResult>> { + let should_coalesce = args.should_coalesce(); + + this.iter_names() + .enumerate() + .map(|(i, c)| { + let selector = if should_coalesce && this_key_schema.contains(c) { + if is_left != (args.how == JoinType::Right) { + Some(c.clone()) + } else if args.how == JoinType::Full { + // We must keep the right-hand side keycols around for + // coalescing. + Some(format_pl_smallstr!("__POLARS_COALESCE_KEYCOL{i}")) + } else { + None + } + } else if !other.contains(c) || is_left { + Some(c.clone()) + } else { + let suffixed = format_pl_smallstr!("{}{}", c, args.suffix()); + if other.contains(&suffixed) { + polars_bail!(Duplicate: "column with name '{suffixed}' already exists\n\n\ + You may want to try:\n\ + - renaming the column prior to joining\n\ + - using the `suffix` parameter to specify a suffix different to the default one ('_right')") + } + Some(suffixed) + }; + Ok(selector) + }) + .collect() +} + +fn postprocess_join(df: DataFrame, params: &EquiJoinParams) -> DataFrame { + if params.args.how == JoinType::Full && params.args.should_coalesce() { + // TODO: don't do string-based column lookups for each dataframe, pre-compute coalesce indices. + let mut key_idx = 0; + df.get_columns() + .iter() + .filter_map(|c| { + if let Some((key_name, _)) = params.left_key_schema.get_at_index(key_idx) { + if c.name() == key_name { + let other = df + .column(&format_pl_smallstr!("__POLARS_COALESCE_KEYCOL{key_idx}")) + .unwrap(); + key_idx += 1; + return Some(coalesce_columns(&[c.clone(), other.clone()]).unwrap()); + } + } + + if c.name().starts_with("__POLARS_COALESCE_KEYCOL") { + return None; + } + + Some(c.clone()) + }) + .collect() + } else { + df + } +} + +fn select_schema(schema: &Schema, selector: &[Option]) -> Schema { + schema + .iter_fields() + .zip(selector) + .filter_map(|(f, name)| Some(f.with_name(name.clone()?))) + .collect() +} + +async fn select_keys( + df: &DataFrame, + key_selectors: &[StreamExpr], + params: &EquiJoinParams, + state: &ExecutionState, +) -> PolarsResult { + let mut key_columns = Vec::new(); + for (i, selector) in key_selectors.iter().enumerate() { + // We use key columns entirely by position, and allow duplicate names, + // so just assign arbitrary unique names. + let unique_name = format_pl_smallstr!("__POLARS_KEYCOL_{i}"); + let s = selector.evaluate(df, state).await?; + key_columns.push(s.into_column().with_name(unique_name)); + } + let keys = DataFrame::new_with_broadcast_len(key_columns, df.height())?; + Ok(HashKeys::from_df( + &keys, + params.random_state.clone(), + params.args.join_nulls, + true, + )) +} + +fn select_payload(df: DataFrame, selector: &[Option]) -> DataFrame { + // Maintain height of zero-width dataframes. + if df.width() == 0 { + return df; + } + + df.take_columns() + .into_iter() + .zip(selector) + .filter_map(|(c, name)| Some(c.with_name(name.clone()?))) + .collect() +} + +#[derive(Default)] +struct BuildPartition { + hash_keys: Vec, + frames: Vec<(MorselSeq, DataFrame)>, + sketch: Option, +} + +struct BuildState { + partitions_per_worker: Vec>, +} + +impl BuildState { + async fn partition_and_sink( + mut recv: Receiver, + partitions: &mut Vec, + partitioner: HashPartitioner, + params: &EquiJoinParams, + state: &ExecutionState, + ) -> PolarsResult<()> { + let track_unmatchable = params.emit_unmatched_build(); + let mut partition_idxs = vec![Vec::new(); partitioner.num_partitions()]; + partitions.resize_with(partitioner.num_partitions(), BuildPartition::default); + let mut sketches = vec![CardinalitySketch::default(); partitioner.num_partitions()]; + + let (key_selectors, payload_selector); + if params.left_is_build { + payload_selector = ¶ms.left_payload_select; + key_selectors = ¶ms.left_key_selectors; + } else { + payload_selector = ¶ms.right_payload_select; + key_selectors = ¶ms.right_key_selectors; + }; + + while let Ok(morsel) = recv.recv().await { + // Compute hashed keys and payload. We must rechunk the payload for + // later chunked gathers. + let hash_keys = select_keys(morsel.df(), key_selectors, params, state).await?; + let mut payload = select_payload(morsel.df().clone(), payload_selector); + payload.rechunk_mut(); + + unsafe { + hash_keys.gen_partition_idxs( + &partitioner, + &mut partition_idxs, + &mut sketches, + track_unmatchable, + ); + for (p, idxs_in_p) in partitions.iter_mut().zip(&partition_idxs) { + p.hash_keys.push(hash_keys.gather(idxs_in_p)); + p.frames.push(( + morsel.seq(), + payload.take_slice_unchecked_impl(idxs_in_p, false), + )); + } + } + } + + for (p, sketch) in sketches.into_iter().enumerate() { + partitions[p].sketch = Some(sketch); + } + + Ok(()) + } + + fn finalize(&mut self, params: &EquiJoinParams, table: &dyn ChunkedIdxTable) -> ProbeState { + let num_partitions = self.partitions_per_worker.len(); + let track_unmatchable = params.emit_unmatched_build(); + let table_per_partition: Vec<_> = (0..num_partitions) + .into_par_iter() + .with_max_len(1) + .map(|p| { + // Estimate sizes and cardinality. + let mut sketch = CardinalitySketch::new(); + let mut num_frames = 0; + for worker in &self.partitions_per_worker { + sketch.combine(worker[p].sketch.as_ref().unwrap()); + num_frames += worker[p].frames.len(); + } + + // Build table for this partition. + let mut combined_frames = Vec::with_capacity(num_frames); + let mut chunk_seq_ids = Vec::with_capacity(num_frames); + let mut table = table.new_empty(); + table.reserve(sketch.estimate() * 5 / 4); + if params.preserve_order_build { + let mut combined = Vec::with_capacity(num_frames); + for worker in &self.partitions_per_worker { + for (hash_keys, (seq, frame)) in + worker[p].hash_keys.iter().zip(&worker[p].frames) + { + combined.push((seq, hash_keys, frame)); + } + } + + combined.sort_unstable_by_key(|c| c.0); + for (seq, hash_keys, frame) in combined { + // Zero-sized chunks can get deleted, so skip entirely to avoid messing + // up the chunk counter. + if frame.height() == 0 { + continue; + } + + table.insert_key_chunk(hash_keys.clone(), track_unmatchable); + combined_frames.push(frame.clone()); + chunk_seq_ids.push(*seq); + } + } else { + for worker in &self.partitions_per_worker { + for (hash_keys, (_, frame)) in + worker[p].hash_keys.iter().zip(&worker[p].frames) + { + // Zero-sized chunks can get deleted, so skip entirely to avoid messing + // up the chunk counter. + if frame.height() == 0 { + continue; + } + + table.insert_key_chunk(hash_keys.clone(), track_unmatchable); + combined_frames.push(frame.clone()); + } + } + } + + let df = if combined_frames.is_empty() { + if params.left_is_build { + DataFrame::empty_with_schema(¶ms.left_payload_schema) + } else { + DataFrame::empty_with_schema(¶ms.right_payload_schema) + } + } else { + accumulate_dataframes_vertical_unchecked(combined_frames) + }; + ProbeTable { + table, + df, + chunk_seq_ids, + } + }) + .collect(); + + ProbeState { + table_per_partition, + max_seq_sent: MorselSeq::default(), + } + } +} + +struct ProbeTable { + // Important that df is not rechunked, the chunks it was inserted with + // into the table must be preserved for chunked gathers. + table: Box, + df: DataFrame, + chunk_seq_ids: Vec, +} + +struct ProbeState { + table_per_partition: Vec, + max_seq_sent: MorselSeq, +} + +impl ProbeState { + /// Returns the max morsel sequence sent. + async fn partition_and_probe( + mut recv: Receiver, + mut send: Sender, + partitions: &[ProbeTable], + partitioner: HashPartitioner, + params: &EquiJoinParams, + state: &ExecutionState, + ) -> PolarsResult { + // TODO: shuffle after partitioning and keep probe tables thread-local. + let mut partition_idxs = vec![Vec::new(); partitioner.num_partitions()]; + let mut table_match = Vec::new(); + let mut probe_match = Vec::new(); + let mut max_seq = MorselSeq::default(); + + let probe_limit = get_ideal_morsel_size() as IdxSize; + let mark_matches = params.emit_unmatched_build(); + let emit_unmatched = params.emit_unmatched_probe(); + + let (key_selectors, payload_selector); + if params.left_is_build { + payload_selector = ¶ms.right_payload_select; + key_selectors = ¶ms.right_key_selectors; + } else { + payload_selector = ¶ms.left_payload_select; + key_selectors = ¶ms.left_key_selectors; + }; + + while let Ok(morsel) = recv.recv().await { + // Compute hashed keys and payload. + let (df, seq, src_token, wait_token) = morsel.into_inner(); + let hash_keys = select_keys(&df, key_selectors, params, state).await?; + let payload = select_payload(df, payload_selector); + max_seq = seq; + + unsafe { + // Partition and probe the tables. + hash_keys.gen_partition_idxs( + &partitioner, + &mut partition_idxs, + &mut [], + emit_unmatched, + ); + if params.preserve_order_probe { + // TODO: non-sort based implementation, can directly scatter + // after finding matches for each partition. + let mut out_per_partition = Vec::with_capacity(partitioner.num_partitions()); + let name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_IDX"); + for (p, idxs_in_p) in partitions.iter().zip(&partition_idxs) { + p.table.probe_subset( + &hash_keys, + idxs_in_p, + &mut table_match, + &mut probe_match, + mark_matches, + emit_unmatched, + IdxSize::MAX, + ); + + let mut build_df = if emit_unmatched { + p.df.take_opt_chunked_unchecked(&table_match) + } else { + p.df.take_chunked_unchecked(&table_match, IsSorted::Not) + }; + let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false); + + let mut out_df = if params.left_is_build { + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df + }; + + let idxs_ca = + IdxCa::from_vec(name.clone(), core::mem::take(&mut probe_match)); + out_df.with_column_unchecked(idxs_ca.into_column()); + out_per_partition.push(out_df); + } + + let sort_options = SortMultipleOptions { + descending: vec![false], + nulls_last: vec![false], + multithreaded: false, + maintain_order: true, + limit: None, + }; + let mut out_df = accumulate_dataframes_vertical_unchecked(out_per_partition); + out_df.sort_in_place([name.clone()], sort_options).unwrap(); + out_df.drop_in_place(&name).unwrap(); + out_df = postprocess_join(out_df, params); + + // TODO: break in smaller morsels. + let out_morsel = Morsel::new(out_df, seq, src_token.clone()); + if send.send(out_morsel).await.is_err() { + break; + } + } else { + for (p, idxs_in_p) in partitions.iter().zip(&partition_idxs) { + let mut offset = 0; + while offset < idxs_in_p.len() { + offset += p.table.probe_subset( + &hash_keys, + &idxs_in_p[offset..], + &mut table_match, + &mut probe_match, + mark_matches, + emit_unmatched, + probe_limit, + ) as usize; + + // Gather output and send. + let mut build_df = if emit_unmatched { + p.df.take_opt_chunked_unchecked(&table_match) + } else { + p.df.take_chunked_unchecked(&table_match, IsSorted::Not) + }; + let mut probe_df = + payload.take_slice_unchecked_impl(&probe_match, false); + + let out_df = if params.left_is_build { + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df + }; + let out_df = postprocess_join(out_df, params); + + let out_morsel = Morsel::new(out_df, seq, src_token.clone()); + if send.send(out_morsel).await.is_err() { + break; + } + } + } + } + } + + drop(wait_token); + } + + Ok(max_seq) + } + + fn ordered_unmatched( + &mut self, + partitioner: &HashPartitioner, + params: &EquiJoinParams, + ) -> DataFrame { + let mut out_per_partition = Vec::with_capacity(partitioner.num_partitions()); + let seq_name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_SEQ"); + let idx_name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_IDX"); + let mut unmarked_idxs = Vec::new(); + unsafe { + for p in self.table_per_partition.iter() { + p.table.unmarked_keys(&mut unmarked_idxs, 0, IdxSize::MAX); + + // Gather and create full-null counterpart. + let mut build_df = p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not); + let len = build_df.height(); + let mut out_df = if params.left_is_build { + let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + let mut probe_df = DataFrame::full_null(¶ms.left_payload_schema, len); + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df + }; + + // The indices are not ordered globally, but within each chunk they are, so sorting + // by chunk sequence id, breaking ties by inner chunk idx works. + let (chunk_seqs, idx_in_chunk) = unmarked_idxs + .iter() + .map(|chunk_id| { + let (chunk, idx_in_chunk) = chunk_id.extract(); + (p.chunk_seq_ids[chunk as usize].to_u64(), idx_in_chunk) + }) + .unzip(); + + let chunk_seqs_ca = UInt64Chunked::from_vec(seq_name.clone(), chunk_seqs); + let idxs_ca = IdxCa::from_vec(idx_name.clone(), idx_in_chunk); + out_df.with_column_unchecked(chunk_seqs_ca.into_column()); + out_df.with_column_unchecked(idxs_ca.into_column()); + out_per_partition.push(out_df); + } + + // Sort by chunk sequence id, then by inner chunk idx. + let sort_options = SortMultipleOptions { + descending: vec![false], + nulls_last: vec![false], + multithreaded: true, + maintain_order: false, + limit: None, + }; + let mut out_df = accumulate_dataframes_vertical_unchecked(out_per_partition); + out_df + .sort_in_place([seq_name.clone(), idx_name.clone()], sort_options) + .unwrap(); + out_df.drop_in_place(&seq_name).unwrap(); + out_df.drop_in_place(&idx_name).unwrap(); + out_df = postprocess_join(out_df, params); + out_df + } + } +} + +struct EmitUnmatchedState { + partitions: Vec, + active_partition_idx: usize, + offset_in_active_p: usize, + morsel_seq: MorselSeq, +} + +impl EmitUnmatchedState { + async fn emit_unmatched( + &mut self, + mut send: Sender, + params: &EquiJoinParams, + num_pipelines: usize, + ) -> PolarsResult<()> { + let total_len: usize = self + .partitions + .iter() + .map(|p| p.table.num_keys() as usize) + .sum(); + let ideal_morsel_count = (total_len / get_ideal_morsel_size()).max(1); + let morsel_count = ideal_morsel_count.next_multiple_of(num_pipelines); + let morsel_size = total_len.div_ceil(morsel_count).max(1); + + let wait_group = WaitGroup::default(); + let source_token = SourceToken::new(); + let mut unmarked_idxs = Vec::new(); + while let Some(p) = self.partitions.get(self.active_partition_idx) { + loop { + // Generate a chunk of unmarked key indices. + self.offset_in_active_p += p.table.unmarked_keys( + &mut unmarked_idxs, + self.offset_in_active_p as IdxSize, + morsel_size as IdxSize, + ) as usize; + if unmarked_idxs.is_empty() { + break; + } + + // Gather and create full-null counterpart. + let out_df = unsafe { + let mut build_df = p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not); + let len = build_df.height(); + if params.left_is_build { + let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + let mut probe_df = DataFrame::full_null(¶ms.left_payload_schema, len); + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df + } + }; + let out_df = postprocess_join(out_df, params); + + // Send and wait until consume token is consumed. + let mut morsel = Morsel::new(out_df, self.morsel_seq, source_token.clone()); + self.morsel_seq = self.morsel_seq.successor(); + morsel.set_consume_token(wait_group.token()); + if send.send(morsel).await.is_err() { + return Ok(()); + } + + wait_group.wait().await; + if source_token.stop_requested() { + return Ok(()); + } + } + + self.active_partition_idx += 1; + self.offset_in_active_p = 0; + } + + Ok(()) + } +} + +enum EquiJoinState { + Build(BuildState), + Probe(ProbeState), + EmitUnmatchedBuild(EmitUnmatchedState), + EmitUnmatchedBuildInOrder(InMemorySourceNode), + Done, +} + +struct EquiJoinParams { + left_is_build: bool, + preserve_order_build: bool, + preserve_order_probe: bool, + left_key_schema: Arc, + left_key_selectors: Vec, + right_key_selectors: Vec, + left_payload_select: Vec>, + right_payload_select: Vec>, + left_payload_schema: Schema, + right_payload_schema: Schema, + args: JoinArgs, + random_state: PlRandomState, +} + +impl EquiJoinParams { + /// Should we emit unmatched rows from the build side? + fn emit_unmatched_build(&self) -> bool { + if self.left_is_build { + self.args.how == JoinType::Left || self.args.how == JoinType::Full + } else { + self.args.how == JoinType::Right || self.args.how == JoinType::Full + } + } + + /// Should we emit unmatched rows from the probe side? + fn emit_unmatched_probe(&self) -> bool { + if self.left_is_build { + self.args.how == JoinType::Right || self.args.how == JoinType::Full + } else { + self.args.how == JoinType::Left || self.args.how == JoinType::Full + } + } +} + +pub struct EquiJoinNode { + state: EquiJoinState, + params: EquiJoinParams, + num_pipelines: usize, + table: Box, +} + +impl EquiJoinNode { + pub fn new( + left_input_schema: Arc, + right_input_schema: Arc, + left_key_schema: Arc, + right_key_schema: Arc, + left_key_selectors: Vec, + right_key_selectors: Vec, + args: JoinArgs, + ) -> PolarsResult { + // TODO: expose as a parameter, and let you choose the primary order to preserve. + let preserve_order = std::env::var("POLARS_JOIN_IGNORE_ORDER").as_deref() != Ok("1"); + + // TODO: use cardinality estimation to determine this when not order-preserving. + let left_is_build = args.how != JoinType::Left; + + let left_payload_select = compute_payload_selector( + &left_input_schema, + &right_input_schema, + &left_key_schema, + true, + &args, + )?; + let right_payload_select = compute_payload_selector( + &right_input_schema, + &left_input_schema, + &right_key_schema, + false, + &args, + )?; + + let table = if left_is_build { + new_chunked_idx_table(left_key_schema.clone()) + } else { + new_chunked_idx_table(right_key_schema) + }; + + let left_payload_schema = select_schema(&left_input_schema, &left_payload_select); + let right_payload_schema = select_schema(&right_input_schema, &right_payload_select); + Ok(Self { + state: EquiJoinState::Build(BuildState { + partitions_per_worker: Vec::new(), + }), + num_pipelines: 0, + params: EquiJoinParams { + left_is_build, + preserve_order_build: preserve_order, + preserve_order_probe: preserve_order, + left_key_schema, + left_key_selectors, + right_key_selectors, + left_payload_select, + right_payload_select, + left_payload_schema, + right_payload_schema, + args, + random_state: PlRandomState::new(), + }, + table, + }) + } +} + +impl ComputeNode for EquiJoinNode { + fn name(&self) -> &str { + "equi_join" + } + + fn initialize(&mut self, num_pipelines: usize) { + self.num_pipelines = num_pipelines; + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(recv.len() == 2 && send.len() == 1); + + let build_idx = if self.params.left_is_build { 0 } else { 1 }; + let probe_idx = 1 - build_idx; + + // If the output doesn't want any more data, transition to being done. + if send[0] == PortState::Done { + self.state = EquiJoinState::Done; + } + + // If we are building and the build input is done, transition to probing. + if let EquiJoinState::Build(build_state) = &mut self.state { + if recv[build_idx] == PortState::Done { + self.state = EquiJoinState::Probe(build_state.finalize(&self.params, &*self.table)); + } + } + + // If we are probing and the probe input is done, emit unmatched if + // necessary, otherwise we're done. + if let EquiJoinState::Probe(probe_state) = &mut self.state { + if recv[probe_idx] == PortState::Done { + if self.params.emit_unmatched_build() { + if self.params.preserve_order_build { + self.state = EquiJoinState::EmitUnmatchedBuild(EmitUnmatchedState { + partitions: core::mem::take(&mut probe_state.table_per_partition), + active_partition_idx: 0, + offset_in_active_p: 0, + morsel_seq: probe_state.max_seq_sent.successor(), + }); + } else { + let partitioner = HashPartitioner::new(self.num_pipelines, 0); + let unmatched = probe_state.ordered_unmatched(&partitioner, &self.params); + let mut src = InMemorySourceNode::new( + Arc::new(unmatched), + probe_state.max_seq_sent.successor(), + ); + src.initialize(self.num_pipelines); + self.state = EquiJoinState::EmitUnmatchedBuildInOrder(src); + } + } else { + self.state = EquiJoinState::Done; + } + } + } + + // Finally, check if we are done emitting unmatched keys. + if let EquiJoinState::EmitUnmatchedBuild(emit_state) = &mut self.state { + if emit_state.active_partition_idx >= emit_state.partitions.len() { + self.state = EquiJoinState::Done; + } + } + + match &mut self.state { + EquiJoinState::Build(_) => { + send[0] = PortState::Blocked; + recv[build_idx] = PortState::Ready; + recv[probe_idx] = PortState::Blocked; + }, + EquiJoinState::Probe(_) => { + core::mem::swap(&mut send[0], &mut recv[probe_idx]); + recv[build_idx] = PortState::Done; + }, + EquiJoinState::EmitUnmatchedBuild(_) => { + send[0] = PortState::Ready; + recv[build_idx] = PortState::Done; + recv[probe_idx] = PortState::Done; + }, + EquiJoinState::EmitUnmatchedBuildInOrder(src_node) => { + recv[build_idx] = PortState::Done; + recv[probe_idx] = PortState::Done; + src_node.update_state(&mut [], &mut send[0..1])?; + if send[0] == PortState::Done { + self.state = EquiJoinState::Done; + } + }, + EquiJoinState::Done => { + send[0] = PortState::Done; + recv[0] = PortState::Done; + recv[1] = PortState::Done; + }, + } + Ok(()) + } + + fn is_memory_intensive_pipeline_blocker(&self) -> bool { + matches!(self.state, EquiJoinState::Build { .. }) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv_ports: &mut [Option>], + send_ports: &mut [Option>], + state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv_ports.len() == 2); + assert!(send_ports.len() == 1); + + let build_idx = if self.params.left_is_build { 0 } else { 1 }; + let probe_idx = 1 - build_idx; + + match &mut self.state { + EquiJoinState::Build(build_state) => { + assert!(send_ports[0].is_none()); + assert!(recv_ports[probe_idx].is_none()); + let receivers = recv_ports[build_idx].take().unwrap().parallel(); + + build_state + .partitions_per_worker + .resize_with(self.num_pipelines, Vec::new); + let partitioner = HashPartitioner::new(self.num_pipelines, 0); + for (worker_ps, recv) in build_state.partitions_per_worker.iter_mut().zip(receivers) + { + join_handles.push(scope.spawn_task( + TaskPriority::High, + BuildState::partition_and_sink( + recv, + worker_ps, + partitioner.clone(), + &self.params, + state, + ), + )); + } + }, + EquiJoinState::Probe(probe_state) => { + assert!(recv_ports[build_idx].is_none()); + let receivers = recv_ports[probe_idx].take().unwrap().parallel(); + let senders = send_ports[0].take().unwrap().parallel(); + + let partitioner = HashPartitioner::new(self.num_pipelines, 0); + let probe_tasks = receivers + .into_iter() + .zip(senders) + .map(|(recv, send)| { + scope.spawn_task( + TaskPriority::High, + ProbeState::partition_and_probe( + recv, + send, + &probe_state.table_per_partition, + partitioner.clone(), + &self.params, + state, + ), + ) + }) + .collect_vec(); + + let max_seq_sent = &mut probe_state.max_seq_sent; + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + for probe_task in probe_tasks { + *max_seq_sent = (*max_seq_sent).max(probe_task.await?); + } + Ok(()) + })); + }, + EquiJoinState::EmitUnmatchedBuild(emit_state) => { + assert!(recv_ports[build_idx].is_none()); + assert!(recv_ports[probe_idx].is_none()); + let send = send_ports[0].take().unwrap().serial(); + join_handles.push(scope.spawn_task( + TaskPriority::Low, + emit_state.emit_unmatched(send, &self.params, self.num_pipelines), + )); + }, + EquiJoinState::EmitUnmatchedBuildInOrder(src_node) => { + src_node.spawn(scope, recv_ports, send_ports, state, join_handles); + }, + EquiJoinState::Done => unreachable!(), + } + } +} diff --git a/crates/polars-stream/src/nodes/joins/in_memory.rs b/crates/polars-stream/src/nodes/joins/in_memory.rs index a98c23a435b0..3fb981c25d20 100644 --- a/crates/polars-stream/src/nodes/joins/in_memory.rs +++ b/crates/polars-stream/src/nodes/joins/in_memory.rs @@ -60,8 +60,10 @@ impl ComputeNode for InMemoryJoinNode { if recv[0] == PortState::Done && recv[1] == PortState::Done { let left_df = left.get_output()?.unwrap(); let right_df = right.get_output()?.unwrap(); - let mut source_node = - InMemorySourceNode::new(Arc::new((self.joiner)(left_df, right_df)?)); + let mut source_node = InMemorySourceNode::new( + Arc::new((self.joiner)(left_df, right_df)?), + MorselSeq::default(), + ); source_node.initialize(self.num_pipelines); self.state = InMemoryJoinState::Source(source_node); } diff --git a/crates/polars-stream/src/nodes/joins/mod.rs b/crates/polars-stream/src/nodes/joins/mod.rs index fa2e12699f5e..f5304162d56a 100644 --- a/crates/polars-stream/src/nodes/joins/mod.rs +++ b/crates/polars-stream/src/nodes/joins/mod.rs @@ -1 +1,2 @@ +pub mod equi_join; pub mod in_memory; diff --git a/crates/polars-stream/src/physical_plan/fmt.rs b/crates/polars-stream/src/physical_plan/fmt.rs index 7ef74d5b0ad9..6da72a3bcb4c 100644 --- a/crates/polars-stream/src/physical_plan/fmt.rs +++ b/crates/polars-stream/src/physical_plan/fmt.rs @@ -214,8 +214,19 @@ fn visualize_plan_rec( left_on, right_on, args, + } + | PhysNodeKind::EquiJoin { + input_left, + input_right, + left_on, + right_on, + args, } => { - let mut label = "in-memory-join".to_string(); + let mut label = if matches!(phys_sm[node_key].kind, PhysNodeKind::EquiJoin { .. }) { + "equi-join".to_string() + } else { + "in-memory-join".to_string() + }; write!(label, r"\nleft_on:\n{}", fmt_exprs(left_on, expr_arena)).unwrap(); write!(label, r"\nright_on:\n{}", fmt_exprs(right_on, expr_arena)).unwrap(); write!( diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index db999481bf97..e7aa33b520fc 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -519,12 +519,50 @@ pub fn lower_ir( let args = options.args.clone(); let phys_left = lower_ir!(input_left)?; let phys_right = lower_ir!(input_right)?; - PhysNodeKind::InMemoryJoin { - input_left: phys_left, - input_right: phys_right, - left_on, - right_on, - args, + if args.how.is_equi() && !args.validation.needs_checks() { + // When lowering the expressions for the keys we need to ensure we keep around the + // payload columns, otherwise the input nodes can get replaced by input-independent + // nodes since the lowering code does not see we access any non-literal expressions. + // So we add dummy expressions before lowering and remove them afterwards. + let mut aug_left_on = left_on.clone(); + for name in phys_sm[phys_left].output_schema.iter_names() { + let col_expr = expr_arena.add(AExpr::Column(name.clone())); + aug_left_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))); + } + let mut aug_right_on = right_on.clone(); + for name in phys_sm[phys_right].output_schema.iter_names() { + let col_expr = expr_arena.add(AExpr::Column(name.clone())); + aug_right_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))); + } + let (trans_input_left, mut trans_left_on) = + lower_exprs(phys_left, &aug_left_on, expr_arena, phys_sm, expr_cache)?; + let (trans_input_right, mut trans_right_on) = + lower_exprs(phys_right, &aug_right_on, expr_arena, phys_sm, expr_cache)?; + trans_left_on.drain(left_on.len()..); + trans_right_on.drain(right_on.len()..); + + let mut node = phys_sm.insert(PhysNode::new( + output_schema, + PhysNodeKind::EquiJoin { + input_left: trans_input_left, + input_right: trans_input_right, + left_on: trans_left_on, + right_on: trans_right_on, + args: args.clone(), + }, + )); + if let Some((offset, len)) = args.slice { + node = build_slice_node(node, offset, len, phys_sm); + } + return Ok(node); + } else { + PhysNodeKind::InMemoryJoin { + input_left: phys_left, + input_right: phys_right, + left_on, + right_on, + args, + } } }, IR::Distinct { .. } => todo!(), diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index 707c2a53dec2..c5f679c7fe56 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -154,6 +154,14 @@ pub enum PhysNodeKind { aggs: Vec, }, + EquiJoin { + input_left: PhysNodeKey, + input_right: PhysNodeKey, + left_on: Vec, + right_on: Vec, + args: JoinArgs, + }, + /// Generic fallback for (as-of-yet) unsupported streaming joins. /// Fully sinks all data to in-memory data frames and uses the in-memory /// engine to perform the join. @@ -217,6 +225,11 @@ fn insert_multiplexers( input_left, input_right, .. + } + | PhysNodeKind::EquiJoin { + input_left, + input_right, + .. } => { let input_right = *input_right; insert_multiplexers(*input_left, phys_sm, referenced); diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 66bb1f4180a8..4bf706c00aa2 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -22,6 +22,7 @@ use slotmap::{SecondaryMap, SlotMap}; use super::{PhysNode, PhysNodeKey, PhysNodeKind}; use crate::expression::StreamExpr; use crate::graph::{Graph, GraphNodeKey}; +use crate::morsel::MorselSeq; use crate::nodes; use crate::physical_plan::lower_expr::compute_output_schema; use crate::utils::late_materialized_df::LateMaterializedDataFrame; @@ -92,7 +93,7 @@ fn to_graph_rec<'a>( let node = &ctx.phys_sm[phys_node_key]; let graph_key = match &node.kind { InMemorySource { df } => ctx.graph.add_node( - nodes::in_memory_source::InMemorySourceNode::new(df.clone()), + nodes::in_memory_source::InMemorySourceNode::new(df.clone(), MorselSeq::default()), [], ), @@ -503,6 +504,49 @@ fn to_graph_rec<'a>( [left_input_key, right_input_key], ) }, + + EquiJoin { + input_left, + input_right, + left_on, + right_on, + args, + } => { + let args = args.clone(); + let left_input_key = to_graph_rec(*input_left, ctx)?; + let right_input_key = to_graph_rec(*input_right, ctx)?; + let left_input_schema = ctx.phys_sm[*input_left].output_schema.clone(); + let right_input_schema = ctx.phys_sm[*input_right].output_schema.clone(); + + let left_key_schema = + compute_output_schema(&left_input_schema, left_on, ctx.expr_arena)? + .materialize_unknown_dtypes()?; + let right_key_schema = + compute_output_schema(&right_input_schema, right_on, ctx.expr_arena)? + .materialize_unknown_dtypes()?; + + let left_key_selectors = left_on + .iter() + .map(|e| create_stream_expr(e, ctx, &left_input_schema)) + .try_collect_vec()?; + let right_key_selectors = right_on + .iter() + .map(|e| create_stream_expr(e, ctx, &right_input_schema)) + .try_collect_vec()?; + + ctx.graph.add_node( + nodes::joins::equi_join::EquiJoinNode::new( + left_input_schema, + right_input_schema, + Arc::new(left_key_schema), + Arc::new(right_key_schema), + left_key_selectors, + right_key_selectors, + args, + )?, + [left_input_key, right_input_key], + ) + }, }; ctx.phys_to_graph.insert(phys_node_key, graph_key); diff --git a/crates/polars-utils/src/idx_map/bytes_idx_map.rs b/crates/polars-utils/src/idx_map/bytes_idx_map.rs index 0df1bbcd9c0c..90e7a5c33ee4 100644 --- a/crates/polars-utils/src/idx_map/bytes_idx_map.rs +++ b/crates/polars-utils/src/idx_map/bytes_idx_map.rs @@ -63,6 +63,14 @@ impl BytesIndexMap { self.table.is_empty() } + pub fn get(&self, hash: u64, key: &[u8]) -> Option<&V> { + let idx = self.table.find(hash.wrapping_mul(self.seed), |i| unsafe { + let t = self.tuples.get_unchecked(*i as usize); + hash == t.0.key_hash && key == t.0.get(&self.key_data) + })?; + unsafe { Some(&self.tuples.get_unchecked(*idx as usize).1) } + } + pub fn entry<'k>(&mut self, hash: u64, key: &'k [u8]) -> Entry<'_, 'k, V> { let entry = self.table.entry( hash.wrapping_mul(self.seed), @@ -91,10 +99,18 @@ impl BytesIndexMap { } } + /// Gets the hash, key and value at the given index by insertion order. + #[inline(always)] + pub fn get_index(&self, idx: IdxSize) -> Option<(u64, &[u8], &V)> { + let t = self.tuples.get(idx as usize)?; + Some((t.0.key_hash, unsafe { t.0.get(&self.key_data) }, &t.1)) + } + /// Gets the hash, key and value at the given index by insertion order. /// /// # Safety /// The index must be less than len(). + #[inline(always)] pub unsafe fn get_index_unchecked(&self, idx: IdxSize) -> (u64, &[u8], &V) { let t = self.tuples.get_unchecked(idx as usize); (t.0.key_hash, t.0.get(&self.key_data), &t.1) @@ -106,6 +122,11 @@ impl BytesIndexMap { .iter() .map(|t| unsafe { (t.0.key_hash, t.0.get(&self.key_data)) }) } + + /// Iterates over the values in insertion order. + pub fn iter_values(&self) -> impl Iterator { + self.tuples.iter().map(|t| &t.1) + } } pub enum Entry<'a, 'k, V> { diff --git a/crates/polars-utils/src/index.rs b/crates/polars-utils/src/index.rs index fb43a1958cd6..2ecad3cbf92b 100644 --- a/crates/polars-utils/src/index.rs +++ b/crates/polars-utils/src/index.rs @@ -226,9 +226,8 @@ impl ChunkId { #[allow(clippy::unnecessary_cast)] pub fn extract(self) -> (IdxSize, IdxSize) { let row = (self.swizzled >> CHUNK_BITS) as IdxSize; - - let mask: IdxSize = IdxSize::MAX << CHUNK_BITS; - let chunk = (self.swizzled as IdxSize) & !mask; + let mask = (1u64 << CHUNK_BITS) - 1; + let chunk = (self.swizzled & mask) as IdxSize; (chunk, row) } @@ -236,6 +235,14 @@ impl ChunkId { pub fn inner_mut(&mut self) -> &mut u64 { &mut self.swizzled } + + pub fn from_inner(inner: u64) -> Self { + Self { swizzled: inner } + } + + pub fn into_inner(self) -> u64 { + self.swizzled + } } #[cfg(test)] diff --git a/py-polars/tests/unit/operations/test_is_sorted.py b/py-polars/tests/unit/operations/test_is_sorted.py index 093dae47bfbf..b14062a988ab 100644 --- a/py-polars/tests/unit/operations/test_is_sorted.py +++ b/py-polars/tests/unit/operations/test_is_sorted.py @@ -331,6 +331,7 @@ def test_sorted_flag() -> None: pl.Series([{"a": 1}], dtype=pl.Object).set_sorted(descending=True) +@pytest.mark.may_fail_auto_streaming def test_sorted_flag_after_joins() -> None: np.random.seed(1) dfa = pl.DataFrame( diff --git a/py-polars/tests/unit/operations/test_join.py b/py-polars/tests/unit/operations/test_join.py index 27cba18e18d5..84433dcac3f2 100644 --- a/py-polars/tests/unit/operations/test_join.py +++ b/py-polars/tests/unit/operations/test_join.py @@ -537,10 +537,10 @@ def test_update() -> None: assert result.collect().to_series().to_list() == [1, 2, 3] result = a.update(b, how="inner", left_on="a", right_on="c") - assert result.collect().to_series().to_list() == [1, 3] + assert sorted(result.collect().to_series().to_list()) == [1, 3] result = a.update(b.rename({"b": "a"}), how="full", on="a") - assert result.collect().to_series().sort().to_list() == [1, 2, 3, 4, 5] + assert sorted(result.collect().to_series().sort().to_list()) == [1, 2, 3, 4, 5] # check behavior of include_nulls=True df = pl.DataFrame( @@ -562,7 +562,7 @@ def test_update() -> None: "B": [-99, 500, None, 700, -66], } ) - assert_frame_equal(out, expected) + assert_frame_equal(out, expected, check_row_order=False) # edge-case #11684 x = pl.DataFrame({"a": [0, 1]}) @@ -604,6 +604,7 @@ def test_join_concat_projection_pd_case_7071() -> None: assert_frame_equal(result, expected) +@pytest.mark.may_fail_auto_streaming # legacy full join is not order-preserving whereas new-streaming is def test_join_sorted_fast_paths_null() -> None: df1 = pl.DataFrame({"x": [0, 1, 0]}).sort("x") df2 = pl.DataFrame({"x": [0, None], "y": [0, 1]}) diff --git a/py-polars/tests/unit/test_string_cache.py b/py-polars/tests/unit/test_string_cache.py index def1f15db07a..084aebb065f1 100644 --- a/py-polars/tests/unit/test_string_cache.py +++ b/py-polars/tests/unit/test_string_cache.py @@ -107,6 +107,7 @@ def my_function() -> None: sc(True) +@pytest.mark.may_fail_auto_streaming def test_string_cache_join() -> None: df1 = pl.DataFrame({"a": ["foo", "bar", "ham"], "b": [1, 2, 3]}) df2 = pl.DataFrame({"a": ["eggs", "spam", "foo"], "c": [2, 2, 3]})