diff --git a/arrow-flight/src/sql/metadata/mod.rs b/arrow-flight/src/sql/metadata/mod.rs index 71551f1849ae..1e9881ffa70e 100644 --- a/arrow-flight/src/sql/metadata/mod.rs +++ b/arrow-flight/src/sql/metadata/mod.rs @@ -53,7 +53,7 @@ fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { .iter() .map(|a| SortField::new(a.data_type().clone())) .collect(); - let mut converter = RowConverter::new(fields).unwrap(); + let converter = RowConverter::new(fields).unwrap(); let rows = converter.convert_columns(arrays).unwrap(); let mut sort: Vec<_> = rows.iter().enumerate().collect(); sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs deleted file mode 100644 index 740b2e205c04..000000000000 --- a/arrow-row/src/dictionary.rs +++ /dev/null @@ -1,296 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::fixed::{FixedLengthEncoding, FromSlice}; -use crate::interner::{Interned, OrderPreservingInterner}; -use crate::{null_sentinel, Row, Rows}; -use arrow_array::builder::*; -use arrow_array::cast::*; -use arrow_array::types::*; -use arrow_array::*; -use arrow_buffer::{ArrowNativeType, MutableBuffer, ToByteSlice}; -use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType, SortOptions}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; - -/// Computes the dictionary mapping for the given dictionary values -pub fn compute_dictionary_mapping( - interner: &mut OrderPreservingInterner, - values: &ArrayRef, -) -> Vec> { - downcast_primitive_array! { - values => interner - .intern(values.iter().map(|x| x.map(|x| x.encode()))), - DataType::Binary => { - let iter = as_generic_binary_array::(values).iter(); - interner.intern(iter) - } - DataType::LargeBinary => { - let iter = as_generic_binary_array::(values).iter(); - interner.intern(iter) - } - DataType::Utf8 => { - let iter = values.as_string::().iter().map(|x| x.map(|x| x.as_bytes())); - interner.intern(iter) - } - DataType::LargeUtf8 => { - let iter = values.as_string::().iter().map(|x| x.map(|x| x.as_bytes())); - interner.intern(iter) - } - _ => unreachable!(), - } -} - -/// Encode dictionary values not preserving the dictionary encoding -pub fn encode_dictionary_values( - data: &mut [u8], - offsets: &mut [usize], - column: &DictionaryArray, - values: &Rows, - null: &Row<'_>, -) { - for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) { - let row = match k { - Some(k) => values.row(k.as_usize()).data, - None => null.data, - }; - let end_offset = *offset + row.len(); - data[*offset..end_offset].copy_from_slice(row); - *offset = end_offset; - } -} - -/// Dictionary types are encoded as -/// -/// - single `0_u8` if null -/// - the bytes of the corresponding normalized key including the null terminator -pub fn encode_dictionary( - data: &mut [u8], - offsets: &mut [usize], - column: &DictionaryArray, - normalized_keys: &[Option<&[u8]>], - opts: SortOptions, -) { - for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) { - match k.and_then(|k| normalized_keys[k.as_usize()]) { - Some(normalized_key) => { - let end_offset = *offset + 1 + normalized_key.len(); - data[*offset] = 1; - data[*offset + 1..end_offset].copy_from_slice(normalized_key); - // Negate if descending - if opts.descending { - data[*offset..end_offset].iter_mut().for_each(|v| *v = !*v) - } - *offset = end_offset; - } - None => { - data[*offset] = null_sentinel(opts); - *offset += 1; - } - } - } -} - -macro_rules! decode_primitive_helper { - ($t:ty, $values: ident, $data_type:ident) => { - decode_primitive::<$t>(&$values, $data_type.clone()) - }; -} - -/// Decodes a string array from `rows` with the provided `options` -/// -/// # Safety -/// -/// `interner` must contain valid data for the provided `value_type` -pub unsafe fn decode_dictionary( - interner: &OrderPreservingInterner, - value_type: &DataType, - options: SortOptions, - rows: &mut [&[u8]], -) -> Result, ArrowError> { - let len = rows.len(); - let mut dictionary: HashMap = HashMap::with_capacity(len); - - let null_sentinel = null_sentinel(options); - - // If descending, the null terminator will have been negated - let null_terminator = match options.descending { - true => 0xFF, - false => 0_u8, - }; - - let mut null_builder = BooleanBufferBuilder::new(len); - let mut keys = BufferBuilder::::new(len); - let mut values = Vec::with_capacity(len); - let mut null_count = 0; - let mut key_scratch = Vec::new(); - - for row in rows { - if row[0] == null_sentinel { - null_builder.append(false); - null_count += 1; - *row = &row[1..]; - keys.append(K::Native::default()); - continue; - } - - let key_offset = row - .iter() - .skip(1) - .position(|x| *x == null_terminator) - .unwrap(); - - // Extract the normalized key including the null terminator - let key = &row[1..key_offset + 2]; - *row = &row[key_offset + 2..]; - - let interned = match options.descending { - true => { - // If options.descending the normalized key will have been - // negated we must first reverse this - key_scratch.clear(); - key_scratch.extend_from_slice(key); - key_scratch.iter_mut().for_each(|o| *o = !*o); - interner.lookup(&key_scratch).unwrap() - } - false => interner.lookup(key).unwrap(), - }; - - let k = match dictionary.entry(interned) { - Entry::Vacant(v) => { - let k = values.len(); - values.push(interner.value(interned)); - let key = K::Native::from_usize(k) - .ok_or(ArrowError::DictionaryKeyOverflowError)?; - *v.insert(key) - } - Entry::Occupied(o) => *o.get(), - }; - - keys.append(k); - null_builder.append(true); - } - - let child = downcast_primitive! { - value_type => (decode_primitive_helper, values, value_type), - DataType::Null => NullArray::new(values.len()).into_data(), - DataType::Boolean => decode_bool(&values), - DataType::Utf8 => decode_string::(&values), - DataType::LargeUtf8 => decode_string::(&values), - DataType::Binary => decode_binary::(&values), - DataType::LargeBinary => decode_binary::(&values), - _ => unreachable!(), - }; - - let data_type = - DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(value_type.clone())); - - let builder = ArrayDataBuilder::new(data_type) - .len(len) - .null_bit_buffer(Some(null_builder.into())) - .null_count(null_count) - .add_buffer(keys.finish()) - .add_child_data(child); - - Ok(DictionaryArray::from(builder.build_unchecked())) -} - -/// Decodes a binary array from dictionary values -/// -/// # Safety -/// -/// Values must be valid UTF-8 -fn decode_binary(values: &[&[u8]]) -> ArrayData { - let capacity = values.iter().map(|x| x.len()).sum(); - let mut builder = GenericBinaryBuilder::::with_capacity(values.len(), capacity); - for v in values { - builder.append_value(v) - } - builder.finish().into_data() -} - -/// Decodes a string array from dictionary values -/// -/// # Safety -/// -/// Values must be valid UTF-8 -unsafe fn decode_string(values: &[&[u8]]) -> ArrayData { - let d = match O::IS_LARGE { - true => DataType::LargeUtf8, - false => DataType::Utf8, - }; - - decode_binary::(values) - .into_builder() - .data_type(d) - .build_unchecked() -} - -/// Decodes a boolean array from dictionary values -fn decode_bool(values: &[&[u8]]) -> ArrayData { - let mut builder = BooleanBufferBuilder::new(values.len()); - for value in values { - builder.append(bool::decode([value[0]])) - } - - let builder = ArrayDataBuilder::new(DataType::Boolean) - .len(values.len()) - .add_buffer(builder.into()); - - // SAFETY: Buffers correct length - unsafe { builder.build_unchecked() } -} - -/// Decodes a fixed length type array from dictionary values -/// -/// # Safety -/// -/// `data_type` must be appropriate native type for `T` -unsafe fn decode_fixed( - values: &[&[u8]], - data_type: DataType, -) -> ArrayData { - let mut buffer = MutableBuffer::new(std::mem::size_of::() * values.len()); - - for value in values { - let value = T::Encoded::from_slice(value, false); - buffer.push(T::decode(value)) - } - - let builder = ArrayDataBuilder::new(data_type) - .len(values.len()) - .add_buffer(buffer.into()); - - // SAFETY: Buffers correct length - builder.build_unchecked() -} - -/// Decodes a `PrimitiveArray` from dictionary values -fn decode_primitive( - values: &[&[u8]], - data_type: DataType, -) -> ArrayData -where - T::Native: FixedLengthEncoding, -{ - assert!(PrimitiveArray::::is_compatible(&data_type)); - - // SAFETY: - // Validated data type above - unsafe { decode_fixed::(values, data_type) } -} diff --git a/arrow-row/src/interner.rs b/arrow-row/src/interner.rs deleted file mode 100644 index 9f5f0b3d33d2..000000000000 --- a/arrow-row/src/interner.rs +++ /dev/null @@ -1,523 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; -use std::num::NonZeroU32; -use std::ops::Index; - -/// An interned value -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option` is 32 bits - -/// A byte array interner that generates normalized keys that are sorted with respect -/// to the interned values, e.g. `inter(a) < intern(b) => a < b` -#[derive(Debug, Default)] -pub struct OrderPreservingInterner { - /// Provides a lookup from [`Interned`] to the normalized key - keys: InternBuffer, - /// Provides a lookup from [`Interned`] to the normalized value - values: InternBuffer, - /// Key allocation data structure - bucket: Box, - - // A hash table used to perform faster re-keying, and detect duplicates - hasher: ahash::RandomState, - lookup: HashMap, -} - -impl OrderPreservingInterner { - /// Interns an iterator of values returning a list of [`Interned`] which can be - /// used with [`Self::normalized_key`] to retrieve the normalized keys with a - /// lifetime not tied to the mutable borrow passed to this method - pub fn intern(&mut self, input: I) -> Vec> - where - I: IntoIterator>, - V: AsRef<[u8]>, - { - let iter = input.into_iter(); - let capacity = iter.size_hint().0; - let mut out = Vec::with_capacity(capacity); - - // (index in output, hash value, value) - let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); - let mut to_intern_len = 0; - - for (idx, item) in iter.enumerate() { - let value: V = match item { - Some(value) => value, - None => { - out.push(None); - continue; - } - }; - - let v = value.as_ref(); - let hash = self.hasher.hash_one(v); - let entry = self - .lookup - .raw_entry_mut() - .from_hash(hash, |a| &self.values[*a] == v); - - match entry { - RawEntryMut::Occupied(o) => out.push(Some(*o.key())), - RawEntryMut::Vacant(_) => { - // Push placeholder - out.push(None); - to_intern_len += v.len(); - to_intern.push((idx, hash, value)); - } - }; - } - - to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); - - self.keys.offsets.reserve(to_intern.len()); - self.keys.values.reserve(to_intern.len()); // Approximation - self.values.offsets.reserve(to_intern.len()); - self.values.values.reserve(to_intern_len); - - for (idx, hash, value) in to_intern { - let val = value.as_ref(); - - let entry = self - .lookup - .raw_entry_mut() - .from_hash(hash, |a| &self.values[*a] == val); - - match entry { - RawEntryMut::Occupied(o) => { - out[idx] = Some(*o.key()); - } - RawEntryMut::Vacant(v) => { - let val = value.as_ref(); - self.bucket - .insert(&mut self.values, val, &mut self.keys.values); - self.keys.values.push(0); - let interned = self.keys.append(); - - let hasher = &mut self.hasher; - let values = &self.values; - v.insert_with_hasher(hash, interned, (), |key| { - hasher.hash_one(&values[*key]) - }); - out[idx] = Some(interned); - } - } - } - - out - } - - /// Returns a null-terminated byte array that can be compared against other normalized_key - /// returned by this instance, to establish ordering of the interned values - pub fn normalized_key(&self, key: Interned) -> &[u8] { - &self.keys[key] - } - - /// Converts a normalized key returned by [`Self::normalized_key`] to [`Interned`] - /// returning `None` if it cannot be found - pub fn lookup(&self, normalized_key: &[u8]) -> Option { - let len = normalized_key.len(); - if len <= 1 { - return None; - } - - let mut bucket = self.bucket.as_ref(); - if len > 2 { - for v in normalized_key.iter().take(len - 2) { - if *v == 255 { - bucket = bucket.next.as_ref()?; - } else { - let bucket_idx = v.checked_sub(1)?; - bucket = bucket.slots.get(bucket_idx as usize)?.child.as_ref()?; - } - } - } - - let slot_idx = normalized_key[len - 2].checked_sub(2)?; - Some(bucket.slots.get(slot_idx as usize)?.value) - } - - /// Returns the interned value for a given [`Interned`] - pub fn value(&self, key: Interned) -> &[u8] { - self.values.index(key) - } - - /// Returns the size of this instance in bytes including self - pub fn size(&self) -> usize { - std::mem::size_of::() - + self.keys.buffer_size() - + self.values.buffer_size() - + self.bucket.size() - + self.lookup.capacity() * std::mem::size_of::() - } -} - -/// A buffer of `[u8]` indexed by `[Interned]` -#[derive(Debug)] -struct InternBuffer { - /// Raw values - values: Vec, - /// The ith value is `&values[offsets[i]..offsets[i+1]]` - offsets: Vec, -} - -impl Default for InternBuffer { - fn default() -> Self { - Self { - values: Default::default(), - offsets: vec![0], - } - } -} - -impl InternBuffer { - /// Insert `data` returning the corresponding [`Interned`] - fn insert(&mut self, data: &[u8]) -> Interned { - self.values.extend_from_slice(data); - self.append() - } - - /// Appends the next value based on data written to `self.values` - /// returning the corresponding [`Interned`] - fn append(&mut self) -> Interned { - let idx: u32 = self.offsets.len().try_into().unwrap(); - let key = Interned(NonZeroU32::new(idx).unwrap()); - self.offsets.push(self.values.len()); - key - } - - /// Returns the byte size of the associated buffers - fn buffer_size(&self) -> usize { - self.values.capacity() + self.offsets.capacity() * std::mem::size_of::() - } -} - -impl Index for InternBuffer { - type Output = [u8]; - - fn index(&self, key: Interned) -> &Self::Output { - let index = key.0.get() as usize; - let end = self.offsets[index]; - let start = self.offsets[index - 1]; - // SAFETY: - // self.values is never reduced in size and values appended - // to self.offsets are always less than self.values at the time - unsafe { self.values.get_unchecked(start..end) } - } -} - -/// A slot corresponds to a single byte-value in the generated normalized key -/// -/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing -/// the next byte in the generated normalized key -#[derive(Debug, Clone)] -struct Slot { - value: Interned, - /// Child values less than `self.value` if any - child: Option>, -} - -/// Bucket is the root of the data-structure used to allocate normalized keys -/// -/// In particular it needs to generate keys that -/// -/// * Contain no `0` bytes other than the null terminator -/// * Compare lexicographically in the same manner as the encoded `data` -/// -/// The data structure consists of 254 slots, each of which can store a value. -/// Additionally each slot may contain a child bucket, containing values smaller -/// than the value within the slot. -/// -/// Each bucket also may contain a child bucket, containing values greater than -/// all values in the current bucket -/// -/// # Allocation Strategy -/// -/// The contiguous slice of slots containing values is searched to find the insertion -/// point for the new value, according to the sort order. -/// -/// If the insertion position exceeds 254, the number of slots, the value is inserted -/// into the child bucket of the current bucket. -/// -/// If the insertion position already contains a value, the value is inserted into the -/// child bucket of that slot. -/// -/// If the slot is not occupied, the value is inserted into that slot. -/// -/// The final key consists of the slot indexes visited incremented by 1, -/// with the final value incremented by 2, followed by a null terminator. -/// -/// Consider the case of the integers `[8, 6, 5, 7]` inserted in that order -/// -/// ```ignore -/// 8: &[2, 0] -/// 6: &[1, 2, 0] -/// 5: &[1, 1, 2, 0] -/// 7: &[1, 3, 0] -/// ``` -/// -/// Note: this allocation strategy is optimised for interning values in sorted order -/// -#[derive(Debug, Clone)] -struct Bucket { - slots: Vec, - /// Bucket containing values larger than all of `slots` - next: Option>, -} - -impl Default for Bucket { - fn default() -> Self { - Self { - slots: Vec::with_capacity(254), - next: None, - } - } -} - -impl Bucket { - /// Insert `data` into this bucket or one of its children, appending the - /// normalized key to `out` as it is constructed - /// - /// # Panics - /// - /// Panics if the value already exists - fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec) { - let slots_len = self.slots.len() as u8; - // We optimise the case of inserting a value directly after those already inserted - // as [`OrderPreservingInterner::intern`] sorts values prior to interning them - match self.slots.last() { - Some(slot) => { - if &values_buf[slot.value] < data { - if slots_len == 254 { - out.push(255); - self.next - .get_or_insert_with(Default::default) - .insert(values_buf, data, out) - } else { - out.push(slots_len + 2); - let value = values_buf.insert(data); - self.slots.push(Slot { value, child: None }); - } - } else { - // Find insertion point - match self - .slots - .binary_search_by(|slot| values_buf[slot.value].cmp(data)) - { - Ok(_) => unreachable!("value already exists"), - Err(idx) => { - out.push(idx as u8 + 1); - self.slots[idx] - .child - .get_or_insert_with(Default::default) - .insert(values_buf, data, out) - } - } - } - } - None => { - out.push(2); - let value = values_buf.insert(data); - self.slots.push(Slot { value, child: None }) - } - } - } - - /// Returns the size of this instance in bytes - fn size(&self) -> usize { - std::mem::size_of::() - + self.slots.capacity() * std::mem::size_of::() - // and account for the size of any embedded buckets in the slots - + self.slot_child_bucket_size() - + self.next.as_ref().map(|x| x.size()).unwrap_or_default() - } - - /// returns the total size of any recursively allocated `Bucket`s - /// in self.slots. This does not include the size of the child Slot itself - fn slot_child_bucket_size(&self) -> usize { - self.slots - .iter() - .map(|slot| slot.child.as_ref().map(|x| x.size()).unwrap_or_default()) - .sum() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rand::prelude::*; - - // Clippy isn't smart enough to understand dropping mutability - #[allow(clippy::needless_collect)] - fn test_intern_values(values: &[u64]) { - let mut interner = OrderPreservingInterner::default(); - - // Intern a single value at a time to check ordering - let interned: Vec<_> = values - .iter() - .flat_map(|v| interner.intern([Some(&v.to_be_bytes())])) - .map(Option::unwrap) - .collect(); - - for (value, interned) in values.iter().zip(&interned) { - assert_eq!(interner.value(*interned), &value.to_be_bytes()); - } - - let normalized_keys: Vec<_> = interned - .iter() - .map(|x| interner.normalized_key(*x)) - .collect(); - - for (interned, normalized) in interned.iter().zip(&normalized_keys) { - assert_eq!(*interned, interner.lookup(normalized).unwrap()); - } - - for (i, a) in normalized_keys.iter().enumerate() { - for (j, b) in normalized_keys.iter().enumerate() { - let interned_cmp = a.cmp(b); - let values_cmp = values[i].cmp(&values[j]); - assert_eq!( - interned_cmp, values_cmp, - "({:?} vs {:?}) vs ({} vs {})", - a, b, values[i], values[j] - ) - } - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_interner() { - test_intern_values(&[8, 6, 5, 7]); - - let mut values: Vec<_> = (0_u64..2000).collect(); - test_intern_values(&values); - - let mut rng = thread_rng(); - values.shuffle(&mut rng); - test_intern_values(&values); - } - - #[test] - fn test_intern_duplicates() { - // Unsorted with duplicates - let values = [0_u8, 1, 8, 4, 1, 0]; - let mut interner = OrderPreservingInterner::default(); - - let interned = interner.intern(values.iter().map(std::slice::from_ref).map(Some)); - let interned: Vec<_> = interned.into_iter().map(Option::unwrap).collect(); - - assert_eq!(interned[0], interned[5]); - assert_eq!(interned[1], interned[4]); - assert!( - interner.normalized_key(interned[0]) < interner.normalized_key(interned[1]) - ); - assert!( - interner.normalized_key(interned[1]) < interner.normalized_key(interned[2]) - ); - assert!( - interner.normalized_key(interned[1]) < interner.normalized_key(interned[3]) - ); - assert!( - interner.normalized_key(interned[3]) < interner.normalized_key(interned[2]) - ); - } - - #[test] - fn test_intern_sizes() { - let mut interner = OrderPreservingInterner::default(); - - // Intern a 1K values each 8 bytes large - let num_items = 1000; - let mut values: Vec = (0..num_items).collect(); - values.reverse(); - - // intern these values 1 at a time (otherwise the interner - // will sort them first); - for v in values { - interner.intern([Some(v.to_be_bytes())]); - } - - let reported = interner.size(); - - // Figure out the expected size (this is a second - // implementation of size()) as a double check - let min_expected = BucketWalker::new() - .visit_bucket(interner.bucket.as_ref()) - .memory_estimate() - // hash table size - + interner.lookup.capacity() * std::mem::size_of::() - // key/value storage - + interner.keys.buffer_size() - + interner.values.buffer_size(); - - assert!( - reported > min_expected, - "reported size {reported} not larger than min expected size: {min_expected}" - ) - } - - // Walks over the buckets / slots counting counting them all - struct BucketWalker { - num_buckets: usize, - num_slots: usize, - } - - impl BucketWalker { - fn new() -> Self { - Self { - num_buckets: 0, - num_slots: 0, - } - } - - // recursively visit the bucket and any slots/buckets contained - fn visit_bucket(mut self, bucket: &Bucket) -> Self { - self.num_buckets += 1; - let acc = bucket - .slots - .iter() - .fold(self, |acc, slot| acc.visit_slot(slot)); - - if let Some(next) = bucket.next.as_ref() { - acc.visit_bucket(next.as_ref()) - } else { - acc - } - } - - // recursively visit slot and any slots/buckets - fn visit_slot(mut self, slot: &Slot) -> Self { - self.num_slots += 1; - if let Some(child) = slot.child.as_ref() { - self.visit_bucket(child.as_ref()) - } else { - self - } - } - - // estimate how much memory is used just for Buckets / Slots - // (an underestimate of the total memory used for the - // interner as it doesn't contain any actual values) - fn memory_estimate(self) -> usize { - self.num_buckets * std::mem::size_of::() - + self.num_slots * std::mem::size_of::() - } - } -} diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index bd1dd7256240..58dc42a4cacb 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -61,7 +61,7 @@ //! let arrays = vec![a1, a2]; //! //! // Convert arrays to rows -//! let mut converter = RowConverter::new(vec![ +//! let converter = RowConverter::new(vec![ //! SortField::new(DataType::Int32), //! SortField::new(DataType::Utf8), //! ]).unwrap(); @@ -109,7 +109,7 @@ //! .iter() //! .map(|a| SortField::new(a.data_type().clone())) //! .collect(); -//! let mut converter = RowConverter::new(fields).unwrap(); +//! let converter = RowConverter::new(fields).unwrap(); //! let rows = converter.convert_columns(arrays).unwrap(); //! let mut sort: Vec<_> = rows.iter().enumerate().collect(); //! sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); @@ -130,22 +130,16 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow_array::cast::*; +use arrow_array::types::ArrowDictionaryKeyType; use arrow_array::*; use arrow_buffer::ArrowNativeType; use arrow_data::ArrayDataBuilder; use arrow_schema::*; -use crate::dictionary::{ - compute_dictionary_mapping, decode_dictionary, encode_dictionary, - encode_dictionary_values, -}; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; -use crate::interner::OrderPreservingInterner; use crate::variable::{decode_binary, decode_string}; -mod dictionary; mod fixed; -mod interner; mod list; mod variable; @@ -271,53 +265,7 @@ mod variable; /// /// ## Dictionary Encoding /// -/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and -/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse -/// the dictionary encoding, and encode the array values directly, however, this would lose -/// the benefits of dictionary encoding to reduce memory and CPU consumption. -/// -/// As such the [`RowConverter`] creates an order-preserving mapping -/// for each dictionary encoded column, which allows new dictionary -/// values to be added whilst preserving the sort order. -/// -/// A null dictionary value is encoded as `0_u8`. -/// -/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array -/// key determined by the order-preserving dictionary encoding -/// -/// ```text -/// ┌──────────┐ ┌─────┐ -/// │ "Bar" │ ───────────────▶│ 01 │ -/// └──────────┘ └─────┘ -/// ┌──────────┐ ┌─────┬─────┐ -/// │"Fabulous"│ ───────────────▶│ 01 │ 02 │ -/// └──────────┘ └─────┴─────┘ -/// ┌──────────┐ ┌─────┐ -/// │ "Soup" │ ───────────────▶│ 05 │ -/// └──────────┘ └─────┘ -/// ┌──────────┐ ┌─────┐ -/// │ "ZZ" │ ───────────────▶│ 07 │ -/// └──────────┘ └─────┘ -/// -/// Example Order Preserving Mapping -/// ``` -/// Using the map above, the corresponding row format will be -/// -/// ```text -/// ┌─────┬─────┬─────┬─────┐ -/// "Fabulous" │ 01 │ 01 │ 02 │ 00 │ -/// └─────┴─────┴─────┴─────┘ -/// -/// ┌─────┬─────┬─────┐ -/// "ZZ" │ 01 │ 07 │ 00 │ -/// └─────┴─────┴─────┘ -/// -/// ┌─────┐ -/// NULL │ 00 │ -/// └─────┘ -/// -/// Input Row Format -/// ``` +/// Dictionaries are hydrated to their underlying values /// /// ## Struct Encoding /// @@ -426,15 +374,9 @@ pub struct RowConverter { enum Codec { /// No additional codec state is necessary Stateless, - /// The interner used to encode dictionary values - /// - /// Used when preserving the dictionary encoding - Dictionary(OrderPreservingInterner), /// A row converter for the dictionary values /// and the encoding of a row containing only nulls - /// - /// Used when not preserving dictionary encoding - DictionaryValues(RowConverter, OwnedRow), + Dictionary(RowConverter, OwnedRow), /// A row converter for the child fields /// and the encoding of a row containing only nulls Struct(RowConverter, OwnedRow), @@ -445,25 +387,22 @@ enum Codec { impl Codec { fn new(sort_field: &SortField) -> Result { match &sort_field.data_type { - DataType::Dictionary(_, values) => match sort_field.preserve_dictionaries { - true => Ok(Self::Dictionary(Default::default())), - false => { - let sort_field = SortField::new_with_options( - values.as_ref().clone(), - sort_field.options, - ); + DataType::Dictionary(_, values) => { + let sort_field = SortField::new_with_options( + values.as_ref().clone(), + sort_field.options, + ); - let mut converter = RowConverter::new(vec![sort_field])?; - let null_array = new_null_array(values.as_ref(), 1); - let nulls = converter.convert_columns(&[null_array])?; + let converter = RowConverter::new(vec![sort_field])?; + let null_array = new_null_array(values.as_ref(), 1); + let nulls = converter.convert_columns(&[null_array])?; - let owned = OwnedRow { - data: nulls.buffer.into(), - config: nulls.config, - }; - Ok(Self::DictionaryValues(converter, owned)) - } - }, + let owned = OwnedRow { + data: nulls.buffer.into(), + config: nulls.config, + }; + Ok(Self::Dictionary(converter, owned)) + } d if !d.is_nested() => Ok(Self::Stateless), DataType::List(f) | DataType::LargeList(f) => { // The encoded contents will be inverted if descending is set to true @@ -490,7 +429,7 @@ impl Codec { }) .collect(); - let mut converter = RowConverter::new(sort_fields)?; + let converter = RowConverter::new(sort_fields)?; let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect(); @@ -509,32 +448,13 @@ impl Codec { } } - fn encoder(&mut self, array: &dyn Array) -> Result, ArrowError> { + fn encoder(&self, array: &dyn Array) -> Result, ArrowError> { match self { Codec::Stateless => Ok(Encoder::Stateless), - Codec::Dictionary(interner) => { - let values = downcast_dictionary_array! { - array => array.values(), - _ => unreachable!() - }; - - let mapping = compute_dictionary_mapping(interner, values) - .into_iter() - .map(|maybe_interned| { - maybe_interned.map(|interned| interner.normalized_key(interned)) - }) - .collect(); - - Ok(Encoder::Dictionary(mapping)) - } - Codec::DictionaryValues(converter, nulls) => { - let values = downcast_dictionary_array! { - array => array.values(), - _ => unreachable!() - }; - - let rows = converter.convert_columns(&[values.clone()])?; - Ok(Encoder::DictionaryValues(rows, nulls.row())) + Codec::Dictionary(converter, nulls) => { + let values = array.as_any_dictionary().values().clone(); + let rows = converter.convert_columns(&[values])?; + Ok(Encoder::Dictionary(rows, nulls.row())) } Codec::Struct(converter, null) => { let v = as_struct_array(array); @@ -556,10 +476,7 @@ impl Codec { fn size(&self) -> usize { match self { Codec::Stateless => 0, - Codec::Dictionary(interner) => interner.size(), - Codec::DictionaryValues(converter, nulls) => { - converter.size() + nulls.data.len() - } + Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(), Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(), Codec::List(converter) => converter.size(), } @@ -570,10 +487,8 @@ impl Codec { enum Encoder<'a> { /// No additional encoder state is necessary Stateless, - /// The mapping from dictionary keys to normalized keys - Dictionary(Vec>), /// The encoding of the child array and the encoding of a null row - DictionaryValues(Rows, Row<'a>), + Dictionary(Rows, Row<'a>), /// The row encoding of the child arrays and the encoding of a null row /// /// It is necessary to encode to a temporary [`Rows`] to avoid serializing @@ -591,8 +506,6 @@ pub struct SortField { options: SortOptions, /// Data type data_type: DataType, - /// Preserve dictionaries - preserve_dictionaries: bool, } impl SortField { @@ -603,30 +516,7 @@ impl SortField { /// Create a new column with the given data type and [`SortOptions`] pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { - Self { - options, - data_type, - preserve_dictionaries: true, - } - } - - /// By default dictionaries are preserved as described on [`RowConverter`] - /// - /// However, this process requires maintaining and incrementally updating - /// an order-preserving mapping of dictionary values. This is relatively expensive - /// computationally but reduces the size of the encoded rows, minimising memory - /// usage and potentially yielding faster comparisons. - /// - /// Some applications may wish to instead trade-off space efficiency, for improved - /// encoding performance, by instead encoding dictionary values directly - /// - /// When `preserve_dictionaries` is true, fields will instead be encoded as their - /// underlying value, reversing any dictionary encoding - pub fn preserve_dictionaries(self, preserve_dictionaries: bool) -> Self { - Self { - preserve_dictionaries, - ..self - } + Self { options, data_type } } /// Return size of this instance in bytes. @@ -679,7 +569,7 @@ impl RowConverter { /// # Panics /// /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] - pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result { + pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result { let num_rows = columns.first().map(|x| x.len()).unwrap_or(0); let mut rows = self.empty_rows(num_rows, 0); self.append(&mut rows, columns)?; @@ -704,7 +594,7 @@ impl RowConverter { /// # use arrow_row::{Row, RowConverter, SortField}; /// # use arrow_schema::DataType; /// # - /// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let a1 = StringArray::from(vec!["hello", "world"]); /// let a2 = StringArray::from(vec!["a", "a", "hello"]); /// @@ -717,7 +607,7 @@ impl RowConverter { /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]); /// ``` pub fn append( - &mut self, + &self, rows: &mut Rows, columns: &[ArrayRef], ) -> Result<(), ArrowError> { @@ -736,7 +626,7 @@ impl RowConverter { let encoders = columns .iter() - .zip(&mut self.codecs) + .zip(&self.codecs) .zip(self.fields.iter()) .map(|((column, codec), field)| { if !column.data_type().equals_datatype(&field.data_type) { @@ -844,7 +734,7 @@ impl RowConverter { /// # use arrow_row::{Row, RowConverter, SortField}; /// # use arrow_schema::DataType; /// # - /// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); /// /// // Convert to row format and deduplicate @@ -1234,20 +1124,7 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { _ => unreachable!(), } } - Encoder::Dictionary(dict) => { - downcast_dictionary_array! { - array => { - for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { - match v.and_then(|v| dict[v as usize]) { - Some(k) => *length += k.len() + 1, - None => *length += 1, - } - } - } - _ => unreachable!(), - } - } - Encoder::DictionaryValues(values, null) => { + Encoder::Dictionary(values, null) => { downcast_dictionary_array! { array => { for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { @@ -1323,13 +1200,7 @@ fn encode_column( _ => unreachable!(), } } - Encoder::Dictionary(dict) => { - downcast_dictionary_array! { - column => encode_dictionary(data, offsets, column, dict, opts), - _ => unreachable!() - } - } - Encoder::DictionaryValues(values, nulls) => { + Encoder::Dictionary(values, nulls) => { downcast_dictionary_array! { column => encode_dictionary_values(data, offsets, column, values, nulls), _ => unreachable!() @@ -1365,18 +1236,31 @@ fn encode_column( } } +/// Encode dictionary values not preserving the dictionary encoding +pub fn encode_dictionary_values( + data: &mut [u8], + offsets: &mut [usize], + column: &DictionaryArray, + values: &Rows, + null: &Row<'_>, +) { + for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) { + let row = match k { + Some(k) => values.row(k.as_usize()).data, + None => null.data, + }; + let end_offset = *offset + row.len(); + data[*offset..end_offset].copy_from_slice(row); + *offset = end_offset; + } +} + macro_rules! decode_primitive_helper { ($t:ty, $rows:ident, $data_type:ident, $options:ident) => { Arc::new(decode_primitive::<$t>($rows, $data_type, $options)) }; } -macro_rules! decode_dictionary_helper { - ($t:ty, $interner:ident, $v:ident, $options:ident, $rows:ident) => { - Arc::new(decode_dictionary::<$t>($interner, $v, $options, $rows)?) - }; -} - /// Decodes a the provided `field` from `rows` /// /// # Safety @@ -1402,20 +1286,11 @@ unsafe fn decode_column( DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)), DataType::Utf8 => Arc::new(decode_string::(rows, options, validate_utf8)), DataType::LargeUtf8 => Arc::new(decode_string::(rows, options, validate_utf8)), + DataType::Dictionary(_, _) => todo!(), _ => unreachable!() } } - Codec::Dictionary(interner) => { - let (k, v) = match &field.data_type { - DataType::Dictionary(k, v) => (k.as_ref(), v.as_ref()), - _ => unreachable!(), - }; - downcast_integer! { - k => (decode_dictionary_helper, interner, v, options, rows), - _ => unreachable!() - } - } - Codec::DictionaryValues(converter, _) => { + Codec::Dictionary(converter, _) => { let cols = converter.convert_raw(rows, validate_utf8)?; cols.into_iter().next().unwrap() } @@ -1487,7 +1362,7 @@ mod tests { ])) as ArrayRef, ]; - let mut converter = RowConverter::new(vec![ + let converter = RowConverter::new(vec![ SortField::new(DataType::Int16), SortField::new(DataType::Float32), ]) @@ -1529,9 +1404,10 @@ mod tests { #[test] fn test_decimal128() { - let mut converter = RowConverter::new(vec![SortField::new( - DataType::Decimal128(DECIMAL128_MAX_PRECISION, 7), - )]) + let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128( + DECIMAL128_MAX_PRECISION, + 7, + ))]) .unwrap(); let col = Arc::new( Decimal128Array::from_iter([ @@ -1558,9 +1434,10 @@ mod tests { #[test] fn test_decimal256() { - let mut converter = RowConverter::new(vec![SortField::new( - DataType::Decimal256(DECIMAL256_MAX_PRECISION, 7), - )]) + let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256( + DECIMAL256_MAX_PRECISION, + 7, + ))]) .unwrap(); let col = Arc::new( Decimal256Array::from_iter([ @@ -1589,7 +1466,7 @@ mod tests { #[test] fn test_bool() { - let mut converter = + let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap(); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) @@ -1603,7 +1480,7 @@ mod tests { let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); - let mut converter = RowConverter::new(vec![SortField::new_with_options( + let converter = RowConverter::new(vec![SortField::new_with_options( DataType::Boolean, SortOptions { descending: true, @@ -1626,7 +1503,7 @@ mod tests { .with_timezone("+01:00".to_string()); let d = a.data_type().clone(); - let mut converter = + let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); let back = converter.convert_rows(&rows).unwrap(); @@ -1644,29 +1521,23 @@ mod tests { let dict = a.finish(); let values = TimestampNanosecondArray::from(dict.values().to_data()); let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00"))); - let d = DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+02:00".into()), - )), - ); + let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into())); + let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone())); assert_eq!(dict_with_tz.data_type(), &d); - let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); let rows = converter .convert_columns(&[Arc::new(dict_with_tz) as _]) .unwrap(); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); - assert_eq!(back[0].data_type(), &d); + assert_eq!(back[0].data_type(), &v); } #[test] fn test_null_encoding() { let col = Arc::new(NullArray::new(10)); - let mut converter = - RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap(); let rows = converter.convert_columns(&[col]).unwrap(); assert_eq!(rows.num_rows(), 10); assert_eq!(rows.row(1).data.len(), 0); @@ -1682,8 +1553,7 @@ mod tests { Some(""), ])) as ArrayRef; - let mut converter = - RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); assert!(rows.row(1) < rows.row(0)); @@ -1714,7 +1584,7 @@ mod tests { Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), ])) as ArrayRef; - let mut converter = + let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); @@ -1734,7 +1604,7 @@ mod tests { let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); - let mut converter = RowConverter::new(vec![SortField::new_with_options( + let converter = RowConverter::new(vec![SortField::new_with_options( DataType::Binary, SortOptions { descending: true, @@ -1762,9 +1632,9 @@ mod tests { } /// If `exact` is false performs a logical comparison between a and dictionary-encoded b - fn dictionary_eq(exact: bool, a: &dyn Array, b: &dyn Array) { + fn dictionary_eq(a: &dyn Array, b: &dyn Array) { match b.data_type() { - DataType::Dictionary(_, v) if !exact => { + DataType::Dictionary(_, v) => { assert_eq!(a.data_type(), v.as_ref()); let b = arrow_cast::cast(b, v).unwrap(); assert_eq!(a, b.as_ref()) @@ -1775,11 +1645,6 @@ mod tests { #[test] fn test_string_dictionary() { - test_string_dictionary_impl(false); - test_string_dictionary_impl(true); - } - - fn test_string_dictionary_impl(preserve: bool) { let a = Arc::new(DictionaryArray::::from_iter([ Some("foo"), Some("hello"), @@ -1791,8 +1656,8 @@ mod tests { Some("hello"), ])) as ArrayRef; - let field = SortField::new(a.data_type().clone()).preserve_dictionaries(preserve); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let field = SortField::new(a.data_type().clone()); + let converter = RowConverter::new(vec![field]).unwrap(); let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_a.row(3) < rows_a.row(5)); @@ -1805,7 +1670,7 @@ mod tests { assert_eq!(rows_a.row(1), rows_a.row(7)); let cols = converter.convert_rows(&rows_a).unwrap(); - dictionary_eq(preserve, &cols[0], &a); + dictionary_eq(&cols[0], &a); let b = Arc::new(DictionaryArray::::from_iter([ Some("hello"), @@ -1819,16 +1684,15 @@ mod tests { assert!(rows_b.row(2) < rows_a.row(0)); let cols = converter.convert_rows(&rows_b).unwrap(); - dictionary_eq(preserve, &cols[0], &b); + dictionary_eq(&cols[0], &b); - let mut converter = RowConverter::new(vec![SortField::new_with_options( + let converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), SortOptions { descending: true, nulls_first: false, }, - ) - .preserve_dictionaries(preserve)]) + )]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); @@ -1838,16 +1702,15 @@ mod tests { assert!(rows_c.row(3) > rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); - dictionary_eq(preserve, &cols[0], &a); + dictionary_eq(&cols[0], &a); - let mut converter = RowConverter::new(vec![SortField::new_with_options( + let converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), SortOptions { descending: true, nulls_first: true, }, - ) - .preserve_dictionaries(preserve)]) + )]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); @@ -1857,7 +1720,7 @@ mod tests { assert!(rows_c.row(3) < rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); - dictionary_eq(preserve, &cols[0], &a); + dictionary_eq(&cols[0], &a); } #[test] @@ -1870,7 +1733,7 @@ mod tests { let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef; let sort_fields = vec![SortField::new(s1.data_type().clone())]; - let mut converter = RowConverter::new(sort_fields).unwrap(); + let converter = RowConverter::new(sort_fields).unwrap(); let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap(); for (a, b) in r1.iter().zip(r1.iter().skip(1)) { @@ -1919,16 +1782,14 @@ mod tests { let data_type = a.data_type().clone(); let columns = [Arc::new(a) as ArrayRef]; - for preserve in [true, false] { - let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve); - let mut converter = RowConverter::new(vec![field]).unwrap(); - let rows = converter.convert_columns(&columns).unwrap(); - assert!(rows.row(0) < rows.row(1)); - assert!(rows.row(2) < rows.row(0)); - assert!(rows.row(3) < rows.row(2)); - assert!(rows.row(6) < rows.row(2)); - assert!(rows.row(3) < rows.row(6)); - } + let field = SortField::new(data_type.clone()); + let converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&columns).unwrap(); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); } #[test] @@ -1949,22 +1810,20 @@ mod tests { .unwrap(); let columns = [Arc::new(DictionaryArray::::from(data)) as ArrayRef]; - for preserve in [true, false] { - let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve); - let mut converter = RowConverter::new(vec![field]).unwrap(); - let rows = converter.convert_columns(&columns).unwrap(); - - assert_eq!(rows.row(0), rows.row(1)); - assert_eq!(rows.row(3), rows.row(4)); - assert_eq!(rows.row(4), rows.row(5)); - assert!(rows.row(3) < rows.row(0)); - } + let field = SortField::new(data_type.clone()); + let converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&columns).unwrap(); + + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); } #[test] #[should_panic(expected = "Encountered non UTF-8 data")] fn test_invalid_utf8() { - let mut converter = + let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; let rows = converter.convert_columns(&[array]).unwrap(); @@ -1981,8 +1840,7 @@ mod tests { #[should_panic(expected = "rows were not produced by this RowConverter")] fn test_different_converter() { let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)])); - let mut converter = - RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); let rows = converter.convert_columns(&[values]).unwrap(); let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); @@ -2013,7 +1871,7 @@ mod tests { let list = Arc::new(builder.finish()) as ArrayRef; let d = list.data_type().clone(); - let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] @@ -2033,7 +1891,7 @@ mod tests { nulls_first: false, }; let field = SortField::new_with_options(d.clone(), options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] @@ -2053,7 +1911,7 @@ mod tests { nulls_first: false, }; let field = SortField::new_with_options(d.clone(), options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] @@ -2073,7 +1931,7 @@ mod tests { nulls_first: true, }; let field = SortField::new_with_options(d, options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] @@ -2137,7 +1995,7 @@ mod tests { nulls_first: true, }; let field = SortField::new_with_options(d.clone(), options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); @@ -2156,7 +2014,7 @@ mod tests { nulls_first: true, }; let field = SortField::new_with_options(d.clone(), options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); @@ -2175,7 +2033,7 @@ mod tests { nulls_first: false, }; let field = SortField::new_with_options(d, options); - let mut converter = RowConverter::new(vec![field]).unwrap(); + let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); @@ -2202,35 +2060,6 @@ mod tests { test_nested_list::(); } - #[test] - fn test_dictionary_preserving() { - let mut dict = StringDictionaryBuilder::::new(); - dict.append_value("foo"); - dict.append_value("foo"); - dict.append_value("bar"); - dict.append_value("bar"); - dict.append_value("bar"); - dict.append_value("bar"); - - let array = Arc::new(dict.finish()) as ArrayRef; - let preserve = SortField::new(array.data_type().clone()); - let non_preserve = preserve.clone().preserve_dictionaries(false); - - let mut c1 = RowConverter::new(vec![preserve]).unwrap(); - let r1 = c1.convert_columns(&[array.clone()]).unwrap(); - - let mut c2 = RowConverter::new(vec![non_preserve]).unwrap(); - let r2 = c2.convert_columns(&[array.clone()]).unwrap(); - - for r in r1.iter() { - assert_eq!(r.data.len(), 3); - } - - for r in r2.iter() { - assert_eq!(r.data.len(), 10); - } - } - fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, @@ -2386,21 +2215,15 @@ mod tests { }) .collect(); - let preserve: Vec<_> = (0..num_columns).map(|_| rng.gen_bool(0.5)).collect(); - let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); let columns = options .into_iter() .zip(&arrays) - .zip(&preserve) - .map(|((o, a), p)| { - SortField::new_with_options(a.data_type().clone(), o) - .preserve_dictionaries(*p) - }) + .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) .collect(); - let mut converter = RowConverter::new(columns).unwrap(); + let converter = RowConverter::new(columns).unwrap(); let rows = converter.convert_columns(&arrays).unwrap(); for i in 0..len { @@ -2423,17 +2246,16 @@ mod tests { } let back = converter.convert_rows(&rows).unwrap(); - for ((actual, expected), preserve) in back.iter().zip(&arrays).zip(preserve) { + for (actual, expected) in back.iter().zip(&arrays) { actual.to_data().validate_full().unwrap(); - dictionary_eq(preserve, actual, expected) + dictionary_eq(actual, expected) } } } #[test] fn test_clear() { - let mut converter = - RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); + let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); let mut rows = converter.empty_rows(3, 128); let first = Int32Array::from(vec![None, Some(2), Some(4)]); @@ -2463,7 +2285,7 @@ mod tests { fn test_append_codec_dictionary_binary() { use DataType::*; // Dictionary RowConverter - let mut converter = RowConverter::new(vec![SortField::new(Dictionary( + let converter = RowConverter::new(vec![SortField::new(Dictionary( Box::new(Int32), Box::new(Binary), ))]) @@ -2484,6 +2306,6 @@ mod tests { converter.append(&mut rows, &[array.clone()]).unwrap(); let back = converter.convert_rows(&rows).unwrap(); - assert_eq!(&back[0], &array); + dictionary_eq(&back[0], &array); } } diff --git a/arrow/benches/lexsort.rs b/arrow/benches/lexsort.rs index 30dab9a74667..25b2279be8d6 100644 --- a/arrow/benches/lexsort.rs +++ b/arrow/benches/lexsort.rs @@ -100,7 +100,7 @@ fn do_bench(c: &mut Criterion, columns: &[Column], len: usize) { .iter() .map(|a| SortField::new(a.data_type().clone())) .collect(); - let mut converter = RowConverter::new(fields).unwrap(); + let converter = RowConverter::new(fields).unwrap(); let rows = converter.convert_columns(&arrays).unwrap(); let mut sort: Vec<_> = rows.iter().enumerate().collect(); sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs index 12ce71764f7e..bde117e3ec3e 100644 --- a/arrow/benches/row_format.rs +++ b/arrow/benches/row_format.rs @@ -23,35 +23,28 @@ use arrow::array::ArrayRef; use arrow::datatypes::{Int64Type, UInt64Type}; use arrow::row::{RowConverter, SortField}; use arrow::util::bench_util::{ - create_primitive_array, create_string_array_with_len, create_string_dict_array, + create_dict_from_values, create_primitive_array, create_string_array_with_len, + create_string_dict_array, }; use arrow_array::types::Int32Type; use arrow_array::Array; use criterion::{black_box, Criterion}; use std::sync::Arc; -fn do_bench( - c: &mut Criterion, - name: &str, - cols: Vec, - preserve_dictionaries: bool, -) { +fn do_bench(c: &mut Criterion, name: &str, cols: Vec) { let fields: Vec<_> = cols .iter() - .map(|x| { - SortField::new(x.data_type().clone()) - .preserve_dictionaries(preserve_dictionaries) - }) + .map(|x| SortField::new(x.data_type().clone())) .collect(); c.bench_function(&format!("convert_columns {name}"), |b| { b.iter(|| { - let mut converter = RowConverter::new(fields.clone()).unwrap(); + let converter = RowConverter::new(fields.clone()).unwrap(); black_box(converter.convert_columns(&cols).unwrap()) }); }); - let mut converter = RowConverter::new(fields).unwrap(); + let converter = RowConverter::new(fields).unwrap(); let rows = converter.convert_columns(&cols).unwrap(); // using a pre-prepared row converter should be faster than the first time c.bench_function(&format!("convert_columns_prepared {name}"), |b| { @@ -65,46 +58,57 @@ fn do_bench( fn row_bench(c: &mut Criterion) { let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; - do_bench(c, "4096 u64(0)", cols, true); + do_bench(c, "4096 u64(0)", cols); let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; - do_bench(c, "4096 i64(0)", cols, true); + do_bench(c, "4096 i64(0)", cols); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; - do_bench(c, "4096 string(10, 0)", cols, true); + do_bench(c, "4096 string(10, 0)", cols); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef]; - do_bench(c, "4096 string(30, 0)", cols, true); + do_bench(c, "4096 string(30, 0)", cols); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef]; - do_bench(c, "4096 string(100, 0)", cols, true); + do_bench(c, "4096 string(100, 0)", cols); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0.5, 100)) as ArrayRef]; - do_bench(c, "4096 string(100, 0.5)", cols, true); + do_bench(c, "4096 string(100, 0.5)", cols); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 10)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(10, 0)", cols, true); + do_bench(c, "4096 string_dictionary(10, 0)", cols); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 30)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(30, 0)", cols, true); + do_bench(c, "4096 string_dictionary(30, 0)", cols); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 100)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(100, 0)", cols.clone(), true); - let name = "4096 string_dictionary_non_preserving(100, 0)"; - do_bench(c, name, cols, false); + do_bench(c, "4096 string_dictionary(100, 0)", cols.clone()); let cols = vec![Arc::new(create_string_dict_array::(4096, 0.5, 100)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(100, 0.5)", cols.clone(), true); - let name = "4096 string_dictionary_non_preserving(100, 0.5)"; - do_bench(c, name, cols, false); + do_bench(c, "4096 string_dictionary(100, 0.5)", cols.clone()); + + let values = create_string_array_with_len::(10, 0., 10); + let dict = create_dict_from_values::(4096, 0., &values); + let cols = vec![Arc::new(dict) as ArrayRef]; + do_bench(c, "4096 string_dictionary_low_cardinality(10, 0)", cols); + + let values = create_string_array_with_len::(10, 0., 30); + let dict = create_dict_from_values::(4096, 0., &values); + let cols = vec![Arc::new(dict) as ArrayRef]; + do_bench(c, "4096 string_dictionary_low_cardinality(30, 0)", cols); + + let values = create_string_array_with_len::(10, 0., 100); + let dict = create_dict_from_values::(4096, 0., &values); + let cols = vec![Arc::new(dict) as ArrayRef]; + do_bench(c, "4096 string_dictionary_low_cardinality(100, 0)", cols); let cols = vec![ Arc::new(create_string_array_with_len::(4096, 0.5, 20)) as ArrayRef, @@ -116,7 +120,6 @@ fn row_bench(c: &mut Criterion) { c, "4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)", cols, - false, ); let cols = vec![ @@ -125,7 +128,7 @@ fn row_bench(c: &mut Criterion) { Arc::new(create_string_dict_array::(4096, 0., 100)) as ArrayRef, Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef, ]; - do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 0), string_dictionary(100, 0), i64(0)", cols, false); + do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 0), string_dictionary(100, 0), i64(0)", cols); } criterion_group!(benches, row_bench);