diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index db825bbea97d..a57abc5b1e71 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -94,7 +94,7 @@ pub struct GenericByteArray { impl Clone for GenericByteArray { fn clone(&self) -> Self { Self { - data_type: self.data_type.clone(), + data_type: T::DATA_TYPE, value_offsets: self.value_offsets.clone(), value_data: self.value_data.clone(), nulls: self.nulls.clone(), @@ -323,7 +323,7 @@ impl GenericByteArray { /// Returns a zero-copy slice of this array with the indicated offset and length. pub fn slice(&self, offset: usize, length: usize) -> Self { Self { - data_type: self.data_type.clone(), + data_type: T::DATA_TYPE, value_offsets: self.value_offsets.slice(offset, length), value_data: self.value_data.clone(), nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), @@ -511,7 +511,7 @@ impl From for GenericByteArray { Self { value_offsets, value_data, - data_type: data.data_type().clone(), + data_type: T::DATA_TYPE, nulls: data.nulls().cloned(), } } diff --git a/arrow-array/src/array/bytes_view_array.rs b/arrow-array/src/array/bytes_view_array.rs new file mode 100644 index 000000000000..020c69409a5b --- /dev/null +++ b/arrow-array/src/array/bytes_view_array.rs @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::array::print_long_array; +use crate::builder::GenericBytesViewBuilder; +use crate::iterator::ArrayIter; +use crate::types::bytes::ByteArrayNativeType; +use crate::types::BytesViewType; +use crate::{Array, ArrayAccessor, ArrayRef}; +use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; +use arrow_data::{ArrayData, ArrayDataBuilder, BytesView}; +use arrow_schema::{ArrowError, DataType}; +use std::any::Any; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; + +/// An array of variable length bytes view arrays +pub struct GenericBytesViewArray { + data_type: DataType, + views: ScalarBuffer, + buffers: Vec, + phantom: PhantomData, + nulls: Option, +} + +impl Clone for GenericBytesViewArray { + fn clone(&self) -> Self { + Self { + data_type: T::DATA_TYPE, + views: self.views.clone(), + buffers: self.buffers.clone(), + nulls: self.nulls.clone(), + phantom: Default::default(), + } + } +} + +impl GenericBytesViewArray { + /// Create a new [`GenericBytesViewArray`] from the provided parts, panicking on failure + /// + /// # Panics + /// + /// Panics if [`GenericBytesViewArray::try_new`] returns an error + pub fn new(views: ScalarBuffer, buffers: Vec, nulls: Option) -> Self { + Self::try_new(views, buffers, nulls).unwrap() + } + + /// Create a new [`GenericBytesViewArray`] from the provided parts, returning an error on failure + /// + /// # Errors + /// + /// * `views.len() != nulls.len()` + /// * [BytesViewType::validate] fails + pub fn try_new( + views: ScalarBuffer, + buffers: Vec, + nulls: Option, + ) -> Result { + T::validate(&views, &buffers)?; + + if let Some(n) = nulls.as_ref() { + if n.len() != views.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect length of null buffer for {}ViewArray, expected {} got {}", + T::PREFIX, + views.len(), + n.len(), + ))); + } + } + + Ok(Self { + data_type: T::DATA_TYPE, + views, + buffers, + nulls, + phantom: Default::default(), + }) + } + + /// Create a new [`GenericBytesViewArray`] from the provided parts, without validation + /// + /// # Safety + /// + /// Safe if [`Self::try_new`] would not error + pub unsafe fn new_unchecked( + views: ScalarBuffer, + buffers: Vec, + nulls: Option, + ) -> Self { + Self { + data_type: T::DATA_TYPE, + phantom: Default::default(), + views, + buffers, + nulls, + } + } + + /// Create a new [`GenericBytesViewArray`] of length `len` where all values are null + pub fn new_null(len: usize) -> Self { + Self { + data_type: T::DATA_TYPE, + views: vec![0; len].into(), + buffers: vec![], + nulls: Some(NullBuffer::new_null(len)), + phantom: Default::default(), + } + } + + /// Creates a [`GenericBytesViewArray`] based on an iterator of values without nulls + pub fn from_iter_values(iter: I) -> Self + where + Ptr: AsRef, + I: IntoIterator, + { + let iter = iter.into_iter(); + let mut builder = GenericBytesViewBuilder::::with_capacity(iter.size_hint().0); + for v in iter { + builder.append_value(v); + } + builder.finish() + } + + /// Deconstruct this array into its constituent parts + pub fn into_parts(self) -> (ScalarBuffer, Vec, Option) { + (self.views, self.buffers, self.nulls) + } + + /// Returns the views buffer + #[inline] + pub fn views(&self) -> &ScalarBuffer { + &self.views + } + + /// Returns the buffers storing string data + #[inline] + pub fn data_buffers(&self) -> &[Buffer] { + &self.buffers + } + + /// Returns the element at index `i` + /// # Panics + /// Panics if index `i` is out of bounds. + pub fn value(&self, i: usize) -> &T::Native { + assert!( + i < self.len(), + "Trying to access an element at index {} from a {}ViewArray of length {}", + i, + T::PREFIX, + self.len() + ); + + unsafe { self.value_unchecked(i) } + } + + /// Returns the element at index `i` + /// # Safety + /// Caller is responsible for ensuring that the index is within the bounds of the array + pub unsafe fn value_unchecked(&self, idx: usize) -> &T::Native { + let v = self.views.get_unchecked(idx); + let len = *v as u32; + let b = if len <= 12 { + let ptr = self.views.as_ptr() as *const u8; + std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize) + } else { + let view = BytesView::from(*v); + let data = self.buffers.get_unchecked(view.buffer_index as usize); + let offset = view.offset as usize; + data.get_unchecked(offset..offset + len as usize) + }; + T::Native::from_bytes_unchecked(b) + } + + /// constructs a new iterator + pub fn iter(&self) -> ArrayIter<&Self> { + ArrayIter::new(self) + } + + /// Returns a zero-copy slice of this array with the indicated offset and length. + pub fn slice(&self, offset: usize, length: usize) -> Self { + Self { + data_type: T::DATA_TYPE, + views: self.views.slice(offset, length), + buffers: self.buffers.clone(), + nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + phantom: Default::default(), + } + } +} + +impl Debug for GenericBytesViewArray { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}ViewArray\n[\n", T::PREFIX)?; + print_long_array(self, f, |array, index, f| { + std::fmt::Debug::fmt(&array.value(index), f) + })?; + write!(f, "]") + } +} + +impl Array for GenericBytesViewArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn to_data(&self) -> ArrayData { + self.clone().into() + } + + fn into_data(self) -> ArrayData { + self.into() + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn slice(&self, offset: usize, length: usize) -> ArrayRef { + Arc::new(self.slice(offset, length)) + } + + fn len(&self) -> usize { + self.views.len() + } + + fn is_empty(&self) -> bool { + self.views.is_empty() + } + + fn offset(&self) -> usize { + 0 + } + + fn nulls(&self) -> Option<&NullBuffer> { + self.nulls.as_ref() + } + + fn get_buffer_memory_size(&self) -> usize { + let mut sum = self.buffers.iter().map(|b| b.capacity()).sum::(); + sum += self.views.inner().capacity(); + if let Some(x) = &self.nulls { + sum += x.buffer().capacity() + } + sum + } + + fn get_array_memory_size(&self) -> usize { + std::mem::size_of::() + self.get_buffer_memory_size() + } +} + +impl<'a, T: BytesViewType + ?Sized> ArrayAccessor for &'a GenericBytesViewArray { + type Item = &'a T::Native; + + fn value(&self, index: usize) -> Self::Item { + GenericBytesViewArray::value(self, index) + } + + unsafe fn value_unchecked(&self, index: usize) -> Self::Item { + GenericBytesViewArray::value_unchecked(self, index) + } +} + +impl<'a, T: BytesViewType + ?Sized> IntoIterator for &'a GenericBytesViewArray { + type Item = Option<&'a T::Native>; + type IntoIter = ArrayIter; + + fn into_iter(self) -> Self::IntoIter { + ArrayIter::new(self) + } +} + +impl From for GenericBytesViewArray { + fn from(value: ArrayData) -> Self { + let views = value.buffers()[0].clone(); + let views = ScalarBuffer::new(views, value.offset(), value.len()); + let buffers = value.buffers()[1..].to_vec(); + Self { + data_type: T::DATA_TYPE, + views, + buffers, + nulls: value.nulls().cloned(), + phantom: Default::default(), + } + } +} + +impl From> for ArrayData { + fn from(mut array: GenericBytesViewArray) -> Self { + let len = array.len(); + array.buffers.insert(0, array.views.into_inner()); + let builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(len) + .buffers(array.buffers) + .nulls(array.nulls); + + unsafe { builder.build_unchecked() } + } +} + +impl FromIterator> for GenericBytesViewArray +where + Ptr: AsRef, +{ + fn from_iter>>(iter: I) -> Self { + let iter = iter.into_iter(); + let mut builder = GenericBytesViewBuilder::::with_capacity(iter.size_hint().0); + builder.extend(iter); + builder.finish() + } +} + +/// A [`GenericBytesViewArray`] of `str` +/// +/// ``` +/// use arrow_array::StringViewArray; +/// let array = StringViewArray::from_iter_values(vec!["hello", "world", "lulu", "large payload over 12 bytes"]); +/// assert_eq!(array.value(0), "hello"); +/// assert_eq!(array.value(3), "large payload over 12 bytes"); +/// ``` +pub type StringViewArray = GenericBytesViewArray; + +/// A [`GenericBytesViewArray`] of `[u8]` +pub type BinaryViewArray = GenericBytesViewArray<[u8]>; + +#[cfg(test)] +mod tests { + use crate::builder::StringViewBuilder; + use crate::types::BytesViewType; + use crate::{Array, BinaryViewArray, StringViewArray}; + + #[test] + fn try_new() { + let array = StringViewArray::from_iter_values(vec![ + "hello", + "world", + "lulu", + "large payload over 12 bytes", + ]); + assert_eq!(array.value(0), "hello"); + assert_eq!(array.value(3), "large payload over 12 bytes"); + + let array = BinaryViewArray::from_iter_values(vec![ + b"hello".to_bytes(), + b"world".to_bytes(), + b"lulu".to_bytes(), + b"large payload over 12 bytes".to_bytes(), + ]); + assert_eq!(array.value(0), b"hello"); + assert_eq!(array.value(3), b"large payload over 12 bytes"); + + let array = { + let mut builder = StringViewBuilder::new(); + builder.finish() + }; + assert!(array.is_empty()); + + let array = { + let mut builder = StringViewBuilder::new(); + builder.append_value("hello"); + builder.append_null(); + builder.append_option(Some("large payload over 12 bytes")); + builder.finish() + }; + assert_eq!(array.value(0), "hello"); + assert!(array.is_null(1)); + assert_eq!(array.value(2), "large payload over 12 bytes"); + } +} diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 7aa3f92bfbd2..3e008e33ebeb 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -65,8 +65,13 @@ mod union_array; pub use union_array::*; mod run_array; + pub use run_array::*; +mod bytes_view_array; + +pub use bytes_view_array::*; + /// An array in the [arrow columnar format](https://arrow.apache.org/docs/format/Columnar.html) pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the array as [`Any`] so that it can be diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs new file mode 100644 index 000000000000..9e45f2c2345f --- /dev/null +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::builder::ArrayBuilder; +use crate::types::BytesViewType; +use crate::{ArrayRef, GenericBytesViewArray}; +use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; +use arrow_data::BytesView; +use std::any::Any; +use std::marker::PhantomData; +use std::sync::Arc; + +const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; + +/// A builder for [`GenericBytesViewArray`] +/// +/// See [`Self::append_value`] for the allocation strategy +pub struct GenericBytesViewBuilder { + views_builder: BufferBuilder, + null_buffer_builder: NullBufferBuilder, + completed: Vec, + in_progress: Vec, + block_size: u32, + phantom: PhantomData, +} + +impl GenericBytesViewBuilder { + /// Creates a new [`GenericByteViewBuilder`]. + pub fn new() -> Self { + Self::with_capacity(1024) + } + + /// Creates a new [`GenericByteViewBuilder`] with space for `capacity` strings + pub fn with_capacity(capacity: usize) -> Self { + Self { + views_builder: BufferBuilder::new(capacity), + null_buffer_builder: NullBufferBuilder::new(capacity), + completed: vec![], + in_progress: vec![], + block_size: DEFAULT_BLOCK_SIZE, + phantom: Default::default(), + } + } + + /// Override the minimum size of buffers to allocate for string data + pub fn with_block_size(self, block_size: u32) -> Self { + Self { block_size, ..self } + } + + /// Appends a value into the builder + /// + /// # Panics + /// + /// Panics if + /// - String buffer count exceeds `u32::MAX` + /// - String length exceeds `u32::MAX` + #[inline] + pub fn append_value(&mut self, value: impl AsRef) { + let v: &[u8] = value.as_ref().as_ref(); + let length: u32 = v.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + v.len()].copy_from_slice(v); + self.views_builder.append(u128::from_le_bytes(view_buffer)); + self.null_buffer_builder.append_non_null(); + return; + } + + let required_cap = self.in_progress.len() + v.len(); + if self.in_progress.capacity() < required_cap { + let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); + let flushed = std::mem::replace(&mut self.in_progress, in_progress); + if !flushed.is_empty() { + assert!(self.completed.len() < u32::MAX as usize); + self.completed.push(flushed.into()); + } + }; + let offset = self.in_progress.len() as u32; + self.in_progress.extend_from_slice(v); + + let view = BytesView { + length, + prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()), + buffer_index: self.completed.len() as u32, + offset, + }; + self.views_builder.append(view.into()); + self.null_buffer_builder.append_non_null(); + } + + /// Append an `Option` value into the builder + #[inline] + pub fn append_option(&mut self, value: Option>) { + match value { + None => self.append_null(), + Some(v) => self.append_value(v), + }; + } + + /// Append a null value into the builder + #[inline] + pub fn append_null(&mut self) { + self.null_buffer_builder.append_null(); + self.views_builder.append(0); + } + + /// Builds the [`GenericBytesViewArray`] and reset this builder + pub fn finish(&mut self) -> GenericBytesViewArray { + let mut completed = std::mem::take(&mut self.completed); + if !self.in_progress.is_empty() { + completed.push(std::mem::take(&mut self.in_progress).into()); + } + let len = self.views_builder.len(); + let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); + let nulls = self.null_buffer_builder.finish(); + // SAFETY: valid by construction + unsafe { GenericBytesViewArray::new_unchecked(views, completed, nulls) } + } + + /// Builds the [`GenericBytesViewArray`] without resetting the builder + pub fn finish_cloned(&self) -> GenericBytesViewArray { + let mut completed = self.completed.clone(); + if !self.in_progress.is_empty() { + completed.push(Buffer::from_slice_ref(&self.in_progress)); + } + let len = self.views_builder.len(); + let views = Buffer::from_slice_ref(self.views_builder.as_slice()); + let views = ScalarBuffer::new(views, 0, len); + let nulls = self.null_buffer_builder.finish_cloned(); + // SAFETY: valid by construction + unsafe { GenericBytesViewArray::new_unchecked(views, completed, nulls) } + } +} + +impl Default for GenericBytesViewBuilder { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for GenericBytesViewBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}ViewBuilder", T::PREFIX)?; + f.debug_struct("") + .field("views_builder", &self.views_builder) + .field("in_progress", &self.in_progress) + .field("completed", &self.completed) + .field("null_buffer_builder", &self.null_buffer_builder) + .finish() + } +} + +impl ArrayBuilder for GenericBytesViewBuilder { + fn len(&self) -> usize { + self.null_buffer_builder.len() + } + + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_box_any(self: Box) -> Box { + self + } +} + +impl> Extend> + for GenericBytesViewBuilder +{ + #[inline] + fn extend>>(&mut self, iter: I) { + for v in iter { + self.append_option(v) + } + } +} + +/// Array builder for [`StringViewArray`][crate::StringViewArray] +/// +/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with +/// [`GenericByteViewBuilder::append_null`] as normal. +pub type StringViewBuilder = GenericBytesViewBuilder; + +/// Array builder for [`BinaryViewArray`][crate::BinaryViewArray] +/// +/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with +/// [`GenericByteViewBuilder::append_null`] as normal. +pub type BinaryViewBuilder = GenericBytesViewBuilder<[u8]>; diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs index d33e565a868b..e4ab7ae4ba23 100644 --- a/arrow-array/src/builder/mod.rs +++ b/arrow-array/src/builder/mod.rs @@ -178,7 +178,10 @@ mod generic_bytes_dictionary_builder; pub use generic_bytes_dictionary_builder::*; mod generic_byte_run_builder; pub use generic_byte_run_builder::*; +mod generic_bytes_view_builder; +pub use generic_bytes_view_builder::*; mod union_builder; + pub use union_builder::*; use crate::ArrayRef; diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 83a229c1da0d..65ba46e5cd79 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -25,12 +25,14 @@ use crate::timezone::Tz; use crate::{ArrowNativeTypeOp, OffsetSizeTrait}; use arrow_buffer::{i256, Buffer, OffsetBuffer}; use arrow_data::decimal::{validate_decimal256_precision, validate_decimal_precision}; +use arrow_data::{validate_binary_view, validate_string_view}; use arrow_schema::{ ArrowError, DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, DECIMAL_DEFAULT_SCALE, }; use chrono::{Duration, NaiveDate, NaiveDateTime}; use half::f16; +use std::fmt::Debug; use std::marker::PhantomData; use std::ops::{Add, Sub}; @@ -1544,6 +1546,101 @@ pub type BinaryType = GenericBinaryType; /// An arrow binary array with i64 offsets pub type LargeBinaryType = GenericBinaryType; +mod bytes_view { + pub trait Sealed: Send + Sync {} + impl Sealed for str {} + impl Sealed for [u8] {} +} + +/// A trait over the variable length bytes view array types +pub trait BytesViewType: bytes_view::Sealed + 'static + PartialEq + AsRef { + /// If element in array is utf8 encoded string. + const IS_UTF8: bool; + + /// Datatype of array elements + const DATA_TYPE: DataType = if Self::IS_UTF8 { + DataType::Utf8View + } else { + DataType::BinaryView + }; + + /// "Binary" or "String", for use in displayed or error messages + const PREFIX: &'static str; + + /// Type for representing its equivalent rust type i.e + /// Utf8Array will have native type has &str + /// BinaryArray will have type as [u8] + type Native: bytes::ByteArrayNativeType + AsRef + AsRef<[u8]> + ?Sized; + + /// Type for owned corresponding to `Native` + type Owned: Debug + Clone + Sync + Send + AsRef; + + /// # Safety + /// The caller must ensure `index < self.len()`. + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self; + + /// To bytes slice. + fn to_bytes(&self) -> &[u8]; + + /// To owned type + #[allow(clippy::wrong_self_convention)] + fn into_owned(&self) -> Self::Owned; + + /// Verifies that the provided buffers are valid for this array type + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError>; +} + +impl BytesViewType for str { + const IS_UTF8: bool = true; + const PREFIX: &'static str = "String"; + + type Native = str; + type Owned = String; + + #[inline(always)] + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self { + std::str::from_utf8_unchecked(slice) + } + + #[inline(always)] + fn to_bytes(&self) -> &[u8] { + self.as_bytes() + } + + fn into_owned(&self) -> Self::Owned { + self.to_string() + } + + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_string_view(views, buffers) + } +} + +impl BytesViewType for [u8] { + const IS_UTF8: bool = false; + const PREFIX: &'static str = "Binary"; + type Native = [u8]; + type Owned = Vec; + + #[inline(always)] + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self { + slice + } + + #[inline(always)] + fn to_bytes(&self) -> &[u8] { + self + } + + fn into_owned(&self) -> Self::Owned { + self.to_vec() + } + + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_binary_view(views, buffers) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/arrow-buffer/src/native.rs b/arrow-buffer/src/native.rs index 38074a8dc26c..5184d60ac1fd 100644 --- a/arrow-buffer/src/native.rs +++ b/arrow-buffer/src/native.rs @@ -149,6 +149,7 @@ native_integer!(u8); native_integer!(u16); native_integer!(u32); native_integer!(u64); +native_integer!(u128); macro_rules! native_float { ($t:ty, $s:ident, $as_usize: expr, $i:ident, $usize_as: expr) => { diff --git a/arrow-data/Cargo.toml b/arrow-data/Cargo.toml index c83f867523d5..e7a897240676 100644 --- a/arrow-data/Cargo.toml +++ b/arrow-data/Cargo.toml @@ -51,6 +51,7 @@ arrow-schema = { workspace = true } num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.1", default-features = false } +simdutf8 = { version = "0.1.4", default-features = false, features = ["std", "aarch64_neon"] } [dev-dependencies] diff --git a/arrow-data/src/bytes_view.rs b/arrow-data/src/bytes_view.rs new file mode 100644 index 000000000000..40cc7c2798a6 --- /dev/null +++ b/arrow-data/src/bytes_view.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::Buffer; +use arrow_schema::ArrowError; + +#[derive(Debug, Copy, Clone, Default)] +#[repr(C)] +pub struct BytesView { + /// The length of the string/bytes. + pub length: u32, + /// First 4 bytes of string/bytes data. + pub prefix: u32, + /// The buffer index. + pub buffer_index: u32, + /// The offset into the buffer. + pub offset: u32, +} + +impl BytesView { + #[inline(always)] + pub fn as_u128(self) -> u128 { + unsafe { std::mem::transmute(self) } + } +} + +impl From for BytesView { + #[inline] + fn from(value: u128) -> Self { + unsafe { std::mem::transmute(value) } + } +} + +impl From for u128 { + #[inline] + fn from(value: BytesView) -> Self { + value.as_u128() + } +} + +/// Validates the combination of `views` and `buffers` is a valid BinaryView +pub fn validate_binary_view(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_view_impl(views, buffers, |_, _| Ok(())) +} + +/// Validates the combination of `views` and `buffers` is a valid StringView +pub fn validate_string_view(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_view_impl(views, buffers, |idx, b| { + simdutf8::basic::from_utf8(b).map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Encountered non-UTF-8 data at index {idx}: {e}" + )) + })?; + Ok(()) + }) +} + +fn validate_view_impl(views: &[u128], buffers: &[Buffer], f: F) -> Result<(), ArrowError> +where + F: Fn(usize, &[u8]) -> Result<(), ArrowError>, +{ + for (idx, v) in views.iter().enumerate() { + let len = *v as u32; + if len <= 12 { + if len < 12 && (v >> (32 + len * 8)) != 0 { + return Err(ArrowError::InvalidArgumentError(format!( + "View at index {idx} contained non-zero padding for string of length {len}", + ))); + } + f(idx, &v.to_le_bytes()[4..4 + len as usize])?; + } else { + let view = BytesView::from(*v); + let data = buffers.get(view.buffer_index as usize).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid buffer index at {idx}: got index {} but only has {} buffers", + view.buffer_index, + buffers.len() + )) + })?; + + let start = view.offset as usize; + let end = start + len as usize; + let b = data.get(start..end).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid buffer slice at {idx}: got {start}..{end} but buffer {} has length {}", + view.buffer_index, + data.len() + )) + })?; + + if !b.starts_with(&view.prefix.to_le_bytes()) { + return Err(ArrowError::InvalidArgumentError( + "Mismatch between embedded prefix and data".to_string(), + )); + } + + f(idx, b)?; + } + } + Ok(()) +} diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 2ddc2d845b01..f30c7db5cd2d 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -27,7 +27,7 @@ use std::mem; use std::ops::Range; use std::sync::Arc; -use crate::equal; +use crate::{equal, validate_binary_view, validate_string_view}; /// A collection of [`Buffer`] #[doc(hidden)] @@ -738,7 +738,10 @@ impl ArrayData { ))); } - if self.buffers.len() != layout.buffers.len() { + // Check data buffers length for view types and other types + if self.buffers.len() < layout.buffers.len() + || (!layout.variadic && self.buffers.len() != layout.buffers.len()) + { return Err(ArrowError::InvalidArgumentError(format!( "Expected {} buffers in array of type {:?}, got {}", layout.buffers.len(), @@ -1233,6 +1236,14 @@ impl ArrayData { DataType::LargeUtf8 => self.validate_utf8::(), DataType::Binary => self.validate_offsets_full::(self.buffers[1].len()), DataType::LargeBinary => self.validate_offsets_full::(self.buffers[1].len()), + DataType::BinaryView => { + let views = self.typed_buffer::(0, self.len)?; + validate_binary_view(views, &self.buffers[1..]) + } + DataType::Utf8View => { + let views = self.typed_buffer::(0, self.len)?; + validate_string_view(views, &self.buffers[1..]) + } DataType::List(_) | DataType::Map(_, _) => { let child = &self.child_data[0]; self.validate_offsets_full::(child.len) @@ -1504,10 +1515,12 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::Null => DataTypeLayout { buffers: vec![], can_contain_null_mask: false, + variadic: false, }, DataType::Boolean => DataTypeLayout { buffers: vec![BufferSpec::BitMap], can_contain_null_mask: true, + variadic: false, }, DataType::Int8 => DataTypeLayout::new_fixed_width::(), DataType::Int16 => DataTypeLayout::new_fixed_width::(), @@ -1539,15 +1552,14 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataTypeLayout { buffers: vec![spec], can_contain_null_mask: true, + variadic: false, } } DataType::Binary => DataTypeLayout::new_binary::(), DataType::LargeBinary => DataTypeLayout::new_binary::(), DataType::Utf8 => DataTypeLayout::new_binary::(), DataType::LargeUtf8 => DataTypeLayout::new_binary::(), - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } + DataType::BinaryView | DataType::Utf8View => DataTypeLayout::new_view(), DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::List(_) => DataTypeLayout::new_fixed_width::(), DataType::LargeList(_) => DataTypeLayout::new_fixed_width::(), @@ -1576,6 +1588,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { } }, can_contain_null_mask: false, + variadic: false, } } DataType::Dictionary(key_type, _value_type) => layout(key_type), @@ -1591,6 +1604,11 @@ pub struct DataTypeLayout { /// Can contain a null bitmask pub can_contain_null_mask: bool, + + /// This field only applies to the view type,[`DataType::BinaryView`] and [`DataType::Utf8View`] + /// If `variadic` is true, the number of buffers expected is only lower-bounded by + /// buffers.len(). Buffers that exceed the lower bound are legal. + pub variadic: bool, } impl DataTypeLayout { @@ -1602,6 +1620,7 @@ impl DataTypeLayout { alignment: mem::align_of::(), }], can_contain_null_mask: true, + variadic: false, } } @@ -1612,6 +1631,7 @@ impl DataTypeLayout { Self { buffers: vec![], can_contain_null_mask: true, + variadic: false, } } @@ -1630,6 +1650,19 @@ impl DataTypeLayout { BufferSpec::VariableWidth, ], can_contain_null_mask: true, + variadic: false, + } + } + + /// Describes a view type + pub fn new_view() -> Self { + Self { + buffers: vec![BufferSpec::FixedWidth { + byte_width: mem::size_of::(), + alignment: mem::align_of::(), + }], + can_contain_null_mask: true, + variadic: true, } } } diff --git a/arrow-data/src/lib.rs b/arrow-data/src/lib.rs index cfa0dba66c35..4399d0f3eca2 100644 --- a/arrow-data/src/lib.rs +++ b/arrow-data/src/lib.rs @@ -30,3 +30,6 @@ pub mod decimal; #[cfg(feature = "ffi")] pub mod ffi; + +mod bytes_view; +pub use bytes_view::*;