From 1634a65ee7e0f60ccc33c7a931c94cf3132dbc02 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 29 May 2024 14:23:29 +0100 Subject: [PATCH] Allow constructing ByteViewArray from existing blocks (#5796) * Allow constructing ByteViewArray from existing blocks * Format * Add tests * More tests --- .../src/builder/generic_bytes_view_builder.rs | 227 ++++++++++++++++-- arrow-array/src/types.rs | 12 + 2 files changed, 222 insertions(+), 17 deletions(-) diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index d043bda98cec..e7f13a68288a 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -15,21 +15,42 @@ // specific language governing permissions and limitations // under the License. -use crate::builder::ArrayBuilder; -use crate::types::{BinaryViewType, ByteViewType, StringViewType}; -use crate::{ArrayRef, GenericByteViewArray}; -use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; -use arrow_data::ByteView; - use std::any::Any; use std::marker::PhantomData; use std::sync::Arc; +use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; +use arrow_data::ByteView; +use arrow_schema::ArrowError; + +use crate::builder::ArrayBuilder; +use crate::types::bytes::ByteArrayNativeType; +use crate::types::{BinaryViewType, ByteViewType, StringViewType}; +use crate::{ArrayRef, GenericByteViewArray}; + const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; /// A builder for [`GenericByteViewArray`] /// -/// See [`Self::append_value`] for the allocation strategy +/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data, +/// and a list of views into those buffers. +/// +/// This builder can be used in two ways +/// +/// # Append Values +/// +/// To avoid bump allocating, this builder allocates data in fixed size blocks, configurable +/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`] +/// writes values larger than 12 bytes to the current in-progress block, with values smaller +/// than 12 bytes inlined into the views. If a value is appended that will not fit in the +/// in-progress block, it will be closed, and a new block of sufficient size allocated +/// +/// # Append Views +/// +/// Some use-cases may wish to reuse an existing allocation containing string data, for example, +/// when parsing data from a parquet data page. In such a case entire blocks can be appended +/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended +/// using [`GenericByteViewBuilder::try_append_view`] pub struct GenericByteViewBuilder { views_builder: BufferBuilder, null_buffer_builder: NullBufferBuilder, @@ -62,6 +83,98 @@ impl GenericByteViewBuilder { Self { block_size, ..self } } + /// Append a new data block returning the new block offset + /// + /// Note: this will first flush any in-progress block + /// + /// This allows appending views from blocks added using [`Self::append_block`]. See + /// [`Self::append_value`] for appending individual values + /// + /// ``` + /// # use arrow_array::builder::StringViewBuilder; + /// let mut builder = StringViewBuilder::new(); + /// + /// let block = builder.append_block(b"helloworldbingobongo".into()); + /// + /// builder.try_append_view(block, 0, 5).unwrap(); + /// builder.try_append_view(block, 5, 5).unwrap(); + /// builder.try_append_view(block, 10, 5).unwrap(); + /// builder.try_append_view(block, 15, 5).unwrap(); + /// builder.try_append_view(block, 0, 15).unwrap(); + /// let array = builder.finish(); + /// + /// let actual: Vec<_> = array.iter().flatten().collect(); + /// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"]; + /// assert_eq!(actual, expected); + /// ``` + pub fn append_block(&mut self, buffer: Buffer) -> u32 { + assert!(buffer.len() < u32::MAX as usize); + + self.flush_in_progress(); + let offset = self.completed.len(); + self.push_completed(buffer); + offset as u32 + } + + /// Try to append a view of the given `block`, `offset` and `length` + /// + /// See [`Self::append_block`] + pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> { + let b = self.completed.get(block as usize).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!("No block found with index {block}")) + })?; + let start = offset as usize; + let end = start.saturating_add(len as usize); + + let b = b.get(start..end).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Range {start}..{end} out of bounds for block of length {}", + b.len() + )) + })?; + + if T::Native::from_bytes_checked(b).is_none() { + return Err(ArrowError::InvalidArgumentError( + "Invalid view data".to_string(), + )); + } + + if len <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); + view_buffer[4..4 + b.len()].copy_from_slice(b); + self.views_builder.append(u128::from_le_bytes(view_buffer)); + } else { + let view = ByteView { + length: len, + prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), + buffer_index: block, + offset, + }; + self.views_builder.append(view.into()); + } + + self.null_buffer_builder.append_non_null(); + Ok(()) + } + + /// Flushes the in progress block if any + #[inline] + fn flush_in_progress(&mut self) { + if !self.in_progress.is_empty() { + let f = Buffer::from_vec(std::mem::take(&mut self.in_progress)); + self.push_completed(f) + } + } + + /// Append a block to `self.completed`, checking for overflow + #[inline] + fn push_completed(&mut self, block: Buffer) { + assert!(block.len() < u32::MAX as usize, "Block too large"); + assert!(self.completed.len() < u32::MAX as usize, "Too many blocks"); + self.completed.push(block); + } + /// Appends a value into the builder /// /// # Panics @@ -84,12 +197,9 @@ impl GenericByteViewBuilder { let required_cap = self.in_progress.len() + v.len(); if self.in_progress.capacity() < required_cap { - let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); - let flushed = std::mem::replace(&mut self.in_progress, in_progress); - if !flushed.is_empty() { - assert!(self.completed.len() < u32::MAX as usize); - self.completed.push(flushed.into()); - } + self.flush_in_progress(); + let to_reserve = v.len().max(self.block_size as usize); + self.in_progress.reserve(to_reserve); }; let offset = self.in_progress.len() as u32; self.in_progress.extend_from_slice(v); @@ -122,10 +232,8 @@ impl GenericByteViewBuilder { /// Builds the [`GenericByteViewArray`] and reset this builder pub fn finish(&mut self) -> GenericByteViewArray { - let mut completed = std::mem::take(&mut self.completed); - if !self.in_progress.is_empty() { - completed.push(std::mem::take(&mut self.in_progress).into()); - } + self.flush_in_progress(); + let completed = std::mem::take(&mut self.completed); let len = self.views_builder.len(); let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); let nulls = self.null_buffer_builder.finish(); @@ -219,3 +327,88 @@ pub type StringViewBuilder = GenericByteViewBuilder; /// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with /// [`GenericByteViewBuilder::append_null`] as normal. pub type BinaryViewBuilder = GenericByteViewBuilder; + +#[cfg(test)] +mod tests { + use super::*; + use crate::Array; + + #[test] + fn test_string_view() { + let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81"); + let b2 = Buffer::from(b"cupcakes"); + let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity"); + + let mut v = StringViewBuilder::new(); + assert_eq!(v.append_block(b1), 0); + + v.append_value("This is a very long string that exceeds the inline length"); + v.append_value("This is another very long string that exceeds the inline length"); + + assert_eq!(v.append_block(b2), 2); + assert_eq!(v.append_block(b3), 3); + + // Test short strings + v.try_append_view(0, 0, 5).unwrap(); // world + v.try_append_view(0, 6, 7).unwrap(); // bananas + v.try_append_view(2, 3, 5).unwrap(); // cake + v.try_append_view(2, 0, 3).unwrap(); // cup + v.try_append_view(2, 0, 8).unwrap(); // cupcakes + v.try_append_view(0, 13, 4).unwrap(); // 😁 + v.try_append_view(0, 13, 0).unwrap(); // + + // Test longer strings + v.try_append_view(3, 0, 16).unwrap(); // Many strings are + v.try_append_view(1, 0, 19).unwrap(); // This is a very long + v.try_append_view(3, 13, 27).unwrap(); // here contained of great length + + v.append_value("I do so like long strings"); + + let array = v.finish_cloned(); + array.to_data().validate_full().unwrap(); + assert_eq!(array.data_buffers().len(), 5); + let actual: Vec<_> = array.iter().map(Option::unwrap).collect(); + assert_eq!( + actual, + &[ + "This is a very long string that exceeds the inline length", + "This is another very long string that exceeds the inline length", + "world", + "bananas", + "cakes", + "cup", + "cupcakes", + "😁", + "", + "Many strings are", + "This is a very long", + "are here contained of great", + "I do so like long strings" + ] + ); + + let err = v.try_append_view(0, u32::MAX, 1).unwrap_err(); + assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"); + + let err = v.try_append_view(0, 1, u32::MAX).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17" + ); + + let err = v.try_append_view(0, 13, 2).unwrap_err(); + assert_eq!(err.to_string(), "Invalid argument error: Invalid view data"); + + let err = v.try_append_view(0, 40, 0).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: Range 40..40 out of bounds for block of length 17" + ); + + let err = v.try_append_view(5, 0, 0).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: No block found with index 5" + ); + } +} diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 198a11cb6974..462776005f93 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -1386,6 +1386,8 @@ pub(crate) mod bytes { impl ByteArrayTypeSealed for GenericBinaryType {} pub trait ByteArrayNativeType: std::fmt::Debug + Send + Sync { + fn from_bytes_checked(b: &[u8]) -> Option<&Self>; + /// # Safety /// /// `b` must be a valid byte sequence for `Self` @@ -1393,6 +1395,11 @@ pub(crate) mod bytes { } impl ByteArrayNativeType for [u8] { + #[inline] + fn from_bytes_checked(b: &[u8]) -> Option<&Self> { + Some(b) + } + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { b @@ -1400,6 +1407,11 @@ pub(crate) mod bytes { } impl ByteArrayNativeType for str { + #[inline] + fn from_bytes_checked(b: &[u8]) -> Option<&Self> { + std::str::from_utf8(b).ok() + } + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { std::str::from_utf8_unchecked(b)