From 2188ac977f9f1e37af23033ba3b3708cb9e83878 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 29 Nov 2024 13:49:27 +0200 Subject: [PATCH] add fixed size list mutable array data --- arrow-data/src/transform/fixed_size_list.rs | 287 +++++++++++++++++++- arrow-data/src/transform/mod.rs | 2 + 2 files changed, 285 insertions(+), 4 deletions(-) diff --git a/arrow-data/src/transform/fixed_size_list.rs b/arrow-data/src/transform/fixed_size_list.rs index 8eef7bce9bb3..2b59a466f3a3 100644 --- a/arrow-data/src/transform/fixed_size_list.rs +++ b/arrow-data/src/transform/fixed_size_list.rs @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::ArrayData; -use arrow_schema::DataType; - -use super::{Extend, _MutableArrayData}; +use arrow_buffer::{bit_util, BooleanBuffer, Buffer, MutableBuffer, NullBuffer}; +use crate::{ArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType, UnionMode}; +use crate::data::new_buffers; +use crate::transform::utils::build_extend_null_bits; +use super::{build_extend_dictionary, build_extend_nulls, build_extend_view, Capacities, Extend, ExtendNullBits, ExtendNulls, _MutableArrayData, preallocate_offset_and_binary_buffer, SpecializedMutableArrayData}; +// TODO - remove +#[deprecated] pub(super) fn build_extend(array: &ArrayData) -> Extend { let size = match array.data_type() { DataType::FixedSizeList(_, i) => *i as usize, @@ -36,6 +40,8 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { ) } +// TODO - remove +#[deprecated] pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let size = match mutable.data_type { DataType::FixedSizeList(_, i) => i as usize, @@ -47,3 +53,276 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { .iter_mut() .for_each(|child| child.extend_nulls(len * size)) } + + + +/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by +/// copying chunks. +/// +/// The main use case of this struct is to perform unary operations to arrays of +/// arbitrary types, such as `filter` and `take`. +/// +/// # Example +/// ``` +/// use arrow_buffer::Buffer; +/// use arrow_data::ArrayData; +/// use arrow_data::transform::{MutableArrayData, SpecializedMutableArrayData}; +/// use arrow_schema::DataType; +/// fn i32_array(values: &[i32]) -> ArrayData { +/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() +/// } +/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); +/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); +/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements +/// let capacity = 3 * std::mem::size_of::(); +/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); +/// // Copy the first 3 elements from arr1 +/// mutable.extend(0, 0, 3); +/// // Copy the last 3 elements from arr2 +/// mutable.extend(1, 2, 4); +/// // Complete the MutableArrayData into a new ArrayData +/// let frozen = mutable.freeze(); +/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); +/// ``` +pub struct FixedSizeListMutableArrayData<'a> { + // TODO - can have specific array instead? + /// Input arrays: the data being read FROM. + /// + /// Note this is "dead code" because all actual references to the arrays are + /// stored in closures for extending values and nulls. + #[allow(dead_code)] + arrays: Vec<&'a ArrayData>, + + /// In progress output array: The data being written TO + /// + /// Note these fields are in a separate struct, [_MutableArrayData], as they + /// cannot be in [crate::transform::MutableArrayData] itself due to mutability invariants (interior + /// mutability): [crate::transform::MutableArrayData] contains a function that can only mutate + /// [_MutableArrayData], not [crate::transform::MutableArrayData] itself + data: _MutableArrayData<'a>, + + /// function used to extend the output array with nulls from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// nulls from it. + extend_null_bits: Vec>, +} + +impl<'a> std::fmt::Debug for FixedSizeListMutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("FixedSizeListMutableArrayData") + .field("data", &self.data) + .finish() + } +} + +impl<'a> FixedSizeListMutableArrayData<'a> { + + // function that extends `[start..start+len]` to the mutable array. + fn extend_values(&mut self, array_index: usize, start: usize, len: usize) { + // TODO - can we avoid this by saving the size in the struct? as we know all data types are the same + let size = match self.arrays[array_index].data_type() { + DataType::FixedSizeList(_, i) => *i as usize, + _ => unreachable!(), + }; + + self.data + .child_data + .iter_mut() + .for_each(|child| child.extend(array_index, start * size, (start + len) * size)) + } + + + /// Similar to [crate::transform::MutableArrayData::new], but lets users define the + /// preallocated capacities of the array with more granularity. + /// + /// See [crate::transform::MutableArrayData::new] for more information on the arguments. + /// + /// # Panics + /// + /// This function panics if the given `capacities` don't match the data type + /// of `arrays`. Or when a [crate::transform::Capacities] variant is not yet supported. + pub fn with_capacities( + arrays: Vec<&'a ArrayData>, + use_nulls: bool, + capacities: Capacities, + ) -> Self { + let data_type = arrays[0].data_type(); + + for a in arrays.iter().skip(1) { + assert_eq!( + data_type, + a.data_type(), + "Arrays with inconsistent types passed to MutableArrayData" + ) + } + + // if any of the arrays has nulls, insertions from any array requires setting bits + // as there is at least one array with nulls. + let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0); + + let mut array_capacity; + + let [buffer1, buffer2] = match &capacities { + Capacities::Array(capacity) => { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + } + Capacities::List(capacity, _) => { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + } + _ => panic!("Capacities: {capacities:?} not supported"), + }; + + let child_data = match data_type { + DataType::FixedSizeList(_, size) => { + let children = arrays + .iter() + .map(|array| &array.child_data()[0]) + .collect::>(); + let capacities = + if let Capacities::List(capacity, ref child_capacities) = capacities { + child_capacities + .clone() + .map(|c| *c) + .unwrap_or(Capacities::Array(capacity * *size as usize)) + } else { + Capacities::Array(array_capacity * *size as usize) + }; + vec![crate::transform::MutableArrayData::with_capacities( + children, use_nulls, capacities, + )] + } + _ => unreachable!(), + }; + + let extend_null_bits = arrays + .iter() + .map(|array| build_extend_null_bits(array, use_nulls)) + .collect(); + + let null_buffer = use_nulls.then(|| { + let null_bytes = bit_util::ceil(array_capacity, 8); + MutableBuffer::from_len_zeroed(null_bytes) + }); + + let data = _MutableArrayData { + data_type: data_type.clone(), + len: 0, + null_count: 0, + null_buffer, + buffer1, + buffer2, + child_data, + }; + Self { + arrays, + data, + extend_null_bits, + } + } +} + +impl<'a> SpecializedMutableArrayData<'a> for FixedSizeListMutableArrayData<'a> { + /// Returns a new [crate::transform::MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [crate::transform::MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [crate::transform::MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + Self::with_capacities(arrays, use_nulls, crate::transform::Capacities::Array(capacity)) + } + + /// Extends the in progress array with a region of the input arrays + /// + /// # Arguments + /// * `index` - the index of array that you what to copy values from + /// * `start` - the start index of the chunk (inclusive) + /// * `end` - the end index of the chunk (exclusive) + /// + /// # Panic + /// This function panics if there is an invalid index, + /// i.e. `index` >= the number of source arrays + /// or `end` > the length of the `index`th array + fn extend(&mut self, index: usize, start: usize, end: usize) { + let len = end - start; + (self.extend_null_bits[index])(&mut self.data, start, len); + self.extend_values(index, start, len); + self.data.len += len; + } + + /// Extends the in progress array with null elements, ignoring the input arrays. + /// + /// # Panics + /// + /// Panics if [`crate::transform::MutableArrayData`] not created with `use_nulls` or nullable source arrays + fn extend_nulls(&mut self, len: usize) { + self.data.len += len; + let bit_len = bit_util::ceil(self.data.len, 8); + let nulls = self.data.null_buffer(); + nulls.resize(bit_len, 0); + self.data.null_count += len; + let size = match self.data.data_type { + DataType::FixedSizeList(_, i) => i as usize, + _ => unreachable!(), + }; + + self.data + .child_data + .iter_mut() + .for_each(|child| child.extend_nulls(len * size)) + } + + /// Returns the current length + #[inline] + fn len(&self) -> usize { + self.data.len + } + + /// Returns true if len is 0 + #[inline] + fn is_empty(&self) -> bool { + self.data.len == 0 + } + + /// Returns the current null count + #[inline] + fn null_count(&self) -> usize { + self.data.null_count + } + + /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. + /// + /// This is useful for extending the default behavior of MutableArrayData. + fn into_builder(self) -> ArrayDataBuilder { + let data = self.data; + + let buffers = vec![]; + + let child_data = data.child_data.into_iter().map(|x| x.freeze()).collect(); + + let nulls = data + .null_buffer + .map(|nulls| { + let bools = BooleanBuffer::new(nulls.into(), 0, data.len); + unsafe { NullBuffer::new_unchecked(bools, data.null_count) } + }) + .filter(|n| n.null_count() > 0); + + ArrayDataBuilder::new(data.data_type) + .offset(0) + .len(data.len) + .nulls(nulls) + .buffers(buffers) + .child_data(child_data) + } +} diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index b5298f8c137b..9c9e4b5eb679 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -40,6 +40,7 @@ mod traits; pub use traits::SpecializedMutableArrayData; pub use boolean::BooleanMutableArrayData; pub use fixed_binary::FixedSizeBinaryMutableArrayData; +pub use fixed_size_list::FixedSizeListMutableArrayData; type ExtendNullBits<'a> = Box; // function that extends `[start..start+len]` to the mutable array. @@ -298,6 +299,7 @@ impl<'a> std::fmt::Debug for MutableArrayData<'a> { } } + /// Define capacities to pre-allocate for child data or data buffers. #[derive(Debug, Clone)] pub enum Capacities {