diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index d043bda98cec..0220c45e0250 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -15,21 +15,42 @@ // specific language governing permissions and limitations // under the License. -use crate::builder::ArrayBuilder; -use crate::types::{BinaryViewType, ByteViewType, StringViewType}; -use crate::{ArrayRef, GenericByteViewArray}; -use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; -use arrow_data::ByteView; - use std::any::Any; use std::marker::PhantomData; use std::sync::Arc; +use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; +use arrow_data::ByteView; +use arrow_schema::ArrowError; + +use crate::{ArrayRef, GenericByteViewArray}; +use crate::builder::ArrayBuilder; +use crate::types::{BinaryViewType, ByteViewType, StringViewType}; +use crate::types::bytes::ByteArrayNativeType; + const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; /// A builder for [`GenericByteViewArray`] /// -/// See [`Self::append_value`] for the allocation strategy +/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data, +/// and a list of views into those buffers. +/// +/// This builder can be used in two ways +/// +/// # Append Values +/// +/// To avoid bump allocating this builder allocates data in fixed size blocks, configurable +/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`] +/// writes values larger than 12 bytes to the current in-progress block, with values smaller +/// than 12 bytes inlined into the views. If a value is appended that will not fit in the +/// in-progress block, it will be closed, and a new block of sufficient size allocated +/// +/// # Append Views +/// +/// Some use-cases may wish to reuse an existing allocation containing string data, for example, +/// when parsing data from a parquet data page. In such a case entire blocks can be appended +/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended +/// using [`GenericByteViewBuilder::try_append_view`] pub struct GenericByteViewBuilder { views_builder: BufferBuilder, null_buffer_builder: NullBufferBuilder, @@ -62,6 +83,98 @@ impl GenericByteViewBuilder { Self { block_size, ..self } } + /// Append a new data block returning the new block offset + /// + /// Note: this will first flush any in-progress block + /// + /// This allows appending views from blocks added using [`Self::append_block`]. See + /// [`Self::append_value`] for appending individual values + /// + /// ``` + /// # use arrow_array::builder::StringViewBuilder; + /// let mut builder = StringViewBuilder::new(); + /// + /// let block = builder.append_block(b"helloworldbingobongo".into()); + /// + /// builder.try_append_view(block, 0, 5).unwrap(); + /// builder.try_append_view(block, 5, 5).unwrap(); + /// builder.try_append_view(block, 10, 5).unwrap(); + /// builder.try_append_view(block, 15, 5).unwrap(); + /// builder.try_append_view(block, 0, 15).unwrap(); + /// let array = builder.finish(); + /// + /// let actual: Vec<_> = array.iter().flatten().collect(); + /// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"]; + /// assert_eq!(actual, expected); + /// ``` + pub fn append_block(&mut self, buffer: Buffer) -> u32 { + assert!(buffer.len() < u32::MAX as usize); + + self.flush_in_progress(); + let offset = self.completed.len(); + self.push_completed(buffer); + offset as u32 + } + + /// Try to append a view of the given `block`, `offset` and `length` + /// + /// See [`Self::append_block`] + pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> { + let b = self.completed.get(block as usize).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!("No block found with index {block}")) + })?; + let start = offset as usize; + let end = start.saturating_add(len as usize); + + let b = b.get(start..end).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Range {start}..{end} out of bounds for block of length {}", + b.len() + )) + })?; + + if T::Native::from_bytes_checked(b).is_none() { + return Err(ArrowError::InvalidArgumentError( + "Invalid view data".to_string(), + )); + } + + if len <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); + view_buffer[4..4 + b.len()].copy_from_slice(b); + self.views_builder.append(u128::from_le_bytes(view_buffer)); + } else { + let view = ByteView { + length: len, + prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), + buffer_index: block, + offset, + }; + self.views_builder.append(view.into()); + } + + self.null_buffer_builder.append_non_null(); + Ok(()) + } + + /// Flushes the in progress block if any + #[inline] + fn flush_in_progress(&mut self) { + if !self.in_progress.is_empty() { + let f = Buffer::from_vec(std::mem::take(&mut self.in_progress)); + self.push_completed(f) + } + } + + /// Append a block to `self.completed`, checking for overflow + #[inline] + fn push_completed(&mut self, block: Buffer) { + assert!(block.len() < u32::MAX as usize, "Block too large"); + assert!(self.completed.len() < u32::MAX as usize, "Too many blocks"); + self.completed.push(block); + } + /// Appends a value into the builder /// /// # Panics @@ -84,12 +197,9 @@ impl GenericByteViewBuilder { let required_cap = self.in_progress.len() + v.len(); if self.in_progress.capacity() < required_cap { - let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); - let flushed = std::mem::replace(&mut self.in_progress, in_progress); - if !flushed.is_empty() { - assert!(self.completed.len() < u32::MAX as usize); - self.completed.push(flushed.into()); - } + self.flush_in_progress(); + let to_reserve = v.len().max(self.block_size as usize); + self.in_progress.reserve(to_reserve); }; let offset = self.in_progress.len() as u32; self.in_progress.extend_from_slice(v); @@ -122,10 +232,8 @@ impl GenericByteViewBuilder { /// Builds the [`GenericByteViewArray`] and reset this builder pub fn finish(&mut self) -> GenericByteViewArray { - let mut completed = std::mem::take(&mut self.completed); - if !self.in_progress.is_empty() { - completed.push(std::mem::take(&mut self.in_progress).into()); - } + self.flush_in_progress(); + let completed = std::mem::take(&mut self.completed); let len = self.views_builder.len(); let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); let nulls = self.null_buffer_builder.finish(); diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 198a11cb6974..462776005f93 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -1386,6 +1386,8 @@ pub(crate) mod bytes { impl ByteArrayTypeSealed for GenericBinaryType {} pub trait ByteArrayNativeType: std::fmt::Debug + Send + Sync { + fn from_bytes_checked(b: &[u8]) -> Option<&Self>; + /// # Safety /// /// `b` must be a valid byte sequence for `Self` @@ -1393,6 +1395,11 @@ pub(crate) mod bytes { } impl ByteArrayNativeType for [u8] { + #[inline] + fn from_bytes_checked(b: &[u8]) -> Option<&Self> { + Some(b) + } + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { b @@ -1400,6 +1407,11 @@ pub(crate) mod bytes { } impl ByteArrayNativeType for str { + #[inline] + fn from_bytes_checked(b: &[u8]) -> Option<&Self> { + std::str::from_utf8(b).ok() + } + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { std::str::from_utf8_unchecked(b)