Skip to content

Commit

Permalink
Encapsulate View manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 6, 2024
1 parent 520ad68 commit ffbf53b
Show file tree
Hide file tree
Showing 5 changed files with 790 additions and 88 deletions.
214 changes: 163 additions & 51 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,36 @@ use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayAccessor, ArrayRef};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_data::{ArrayData, ArrayDataBuilder, OffsetView, View};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

/// [Variable-size Binary View Layout]: An array of variable length bytes view arrays.
///
/// Different than [`crate::GenericByteArray`] as it stores both an offset and length
/// meaning that take / filter operations can be implemented without copying the underlying data.
///
/// See [`StringViewArray`] for storing utf8 encoded string data and
/// [`BinaryViewArray`] for storing bytes.
///
/// [Variable-size Binary View Layout]: https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
/// [Variable-size Binary View Layout]: An array of variable length byte strings.
///
/// A `GenericByteViewArray` stores variable length byte strings. An array of
/// `N` elements is stored as `N` fixed length "views" and a variable number
/// `N` elements is stored as `N` fixed length [`View`]s and some number
/// of variable length "buffers".
///
/// Each view is a `u128` value layout is different depending on the
/// length of the string stored at that location:
/// There are no constraints on offsets other than they must point into a valid
/// buffer. The offsets can be out of order, non-continuous and overlapping.
///
/// ```text
/// ┌──────┬────────────────────────┐
/// │length│ string value │
/// Strings (len <= 12) │ │ (padded with 0) │
/// └──────┴────────────────────────┘
/// 0 31 127
///
/// ┌───────┬───────┬───────┬───────┐
/// │length │prefix │ buf │offset │
/// Strings (len > 12) │ │ │ index │ │
/// └───────┴───────┴───────┴───────┘
/// 0 31 63 95 127
/// ```
/// Because `GenericByteViewArray` stores both an offset and length for each
/// byte string, certain operations such as `take` and `filter` can be
/// implemented without copying the underlying data, unlike
/// [`GenericByteArray`], which requires the variable length data to be
/// contiguous.
///
/// * Strings with length <= 12 are stored directly in the view.
/// # See Also:
/// * [`StringViewArray`] for storing UTF-8 string data
/// * [`BinaryViewArray`] for storing bytes
/// * [`View`] for the format of the views and interpreting the `u128` views
///
/// * Strings with length > 12: The first four bytes are stored inline in the
/// view and the entire string is stored in one of the buffers.
/// [Variable-size Binary View Layout]: https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
///
/// Unlike [`GenericByteArray`], there are no constraints on the offsets other
/// than they must point into a valid buffer. However, they can be out of order,
/// non continuous and overlapping.
/// # Example
///
/// For example, in the following diagram, the strings "FishWasInTownToday" and
/// "CrumpleFacedFish" are both longer than 12 bytes and thus are stored in a
Expand All @@ -93,6 +77,7 @@ use std::sync::Arc;
/// └───┘
/// ```
/// [`GenericByteArray`]: crate::array::GenericByteArray
/// [`View`]: arrow_data::View
pub struct GenericByteViewArray<T: ByteViewType + ?Sized> {
data_type: DataType,
views: ScalarBuffer<u128>,
Expand All @@ -114,16 +99,26 @@ impl<T: ByteViewType + ?Sized> Clone for GenericByteViewArray<T> {
}

impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// Create a new [`GenericByteViewArray`] from the provided parts, panicking on failure
/// Create a new [`GenericByteViewArray`] from the provided parts, panicking
/// on failure.
///
/// # Panics
/// See [Self::try_new] for parameters
///
/// # Panics
/// Panics if [`GenericByteViewArray::try_new`] returns an error
///
/// [`View`]: arrow_data::View
pub fn new(views: ScalarBuffer<u128>, buffers: Vec<Buffer>, nulls: Option<NullBuffer>) -> Self {
Self::try_new(views, buffers, nulls).unwrap()
}

/// Create a new [`GenericByteViewArray`] from the provided parts, returning an error on failure
/// Create a new [`GenericByteViewArray`] from the provided parts, returning
/// an error on failure
///
/// # Parameters
/// * `views`: a [`ScalarBuffer`] of u128 views (see [`View`] for format)
/// * `buffers`: a vector of [`Buffer`]s storing the string data
/// * `nulls`: an optional [`NullBuffer`] for null values
///
/// # Errors
///
Expand Down Expand Up @@ -156,7 +151,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
})
}

/// Create a new [`GenericByteViewArray`] from the provided parts, without validation
/// Create a new [`GenericByteViewArray`] from the provided parts, without
/// validation
///
/// See [Self::try_new] for parameters
///
/// # Safety
///
Expand Down Expand Up @@ -233,20 +231,68 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}

/// Returns the element at index `i`
///
/// # Safety
/// Caller is responsible for ensuring that the index is within the bounds of the array
pub unsafe fn value_unchecked(&self, idx: usize) -> &T::Native {
let v = self.views.get_unchecked(idx);
let len = *v as u32;
let b = if len <= 12 {
let ptr = self.views.as_ptr() as *const u8;
std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize)
} else {
let view = ByteView::from(*v);
let data = self.buffers.get_unchecked(view.buffer_index as usize);
let offset = view.offset as usize;
data.get_unchecked(offset..offset + len as usize)
};
match View::from(v) {
View::Inline(inline_view) => {
let bytes = inline_view.get_bytes_unchecked(v);
T::Native::from_bytes_unchecked(bytes)
}
View::Offset(offset_view) => self.value_from_offset_view_unchecked(offset_view),
}
}

/// Return the value of element from this [`OffsetView`]
///
/// # Errors
/// * the buffer index is out of bounds
///* offset / length is out of bounds of the buffer
/// * The data is not valid for `T::Native` (e.g. not Utf8)
pub fn value_from_offset_view<'a>(
&'a self,
offset_view: OffsetView<'_>,
) -> Result<&'a T::Native, ArrowError> {
let data = self
.buffers
.get(offset_view.buffer_index() as usize)
.ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Invalid ByteView. Requested buffer {} but only has {} buffers",
offset_view.buffer_index(),
self.buffers.len()
))
})?;

let b = data.get(offset_view.range()).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Invalid ByteView. Requested range {:?} but buffer {} valid range is {:?}",
offset_view.range(),
offset_view.buffer_index(),
0..data.len()
))
})?;

T::Native::try_from_bytes(b)
}

/// Return the value from the [`OffsetView`]
///
/// # Safety
/// The caller is responsible for ensuring:
/// * the buffer index is within of bounds
/// * offset / length is within of bounds of the buffer
/// * The data is valid for `T::Native` (e.g Utf8 for Strings)
pub unsafe fn value_from_offset_view_unchecked<'a>(
&'a self,
offset_view: OffsetView<'_>,
) -> &'a T::Native {
let data = self
.buffers
.get_unchecked(offset_view.buffer_index() as usize);
let b = data.get_unchecked(offset_view.range());
T::Native::from_bytes_unchecked(b)
}

Expand Down Expand Up @@ -487,7 +533,7 @@ mod tests {
use crate::builder::{BinaryViewBuilder, StringViewBuilder};
use crate::{Array, BinaryViewArray, StringViewArray};
use arrow_buffer::{Buffer, ScalarBuffer};
use arrow_data::ByteView;
use arrow_data::{ByteView, OffsetView, View};

#[test]
fn try_new_string() {
Expand Down Expand Up @@ -533,6 +579,72 @@ mod tests {
assert!(array.is_empty());
}

#[test]
fn test_value_from_offset_view() {
let array = test_array();
let View::Offset(offset_view) = View::new(array.views().get(2).unwrap()) else {
panic!("Expected offset view");
};
assert_eq!(
array.value_from_offset_view(offset_view).unwrap(),
"large payload over 12 bytes"
);
}

#[test]
fn test_value_from_offset_view2() {
let array = test_array();
// Get last 60 bytes from buffer (60 is in hex 0x3c)
// buffer is 65
// offset 5, index 0, prefix=????, length 60
let v = 0x00000005_00000000_00000000_0000003cu128;

assert_eq!(
array.value_from_offset_view(OffsetView::from(&v)).unwrap(),
" payload over 12 bytessome other large payload over 12 bytes"
);
}

#[test]
#[should_panic(expected = "Invalid ByteView. Requested buffer 2 but only has 1 buffers")]
fn test_value_from_offset_view_invalid_buffer() {
let array = test_array();
// offset 0, buffer = 2, prefix = ????, length = 256
let v = 0x00000000_00000002_00000000_00000100u128;
array.value_from_offset_view(OffsetView::from(&v)).unwrap();
}

#[test]
#[should_panic(
expected = "Invalid ByteView. Requested range 256..271 but buffer 0 valid range is 0..65"
)]
fn test_value_from_offset_view_invalid_offset() {
let array = test_array();
// offset 256, buffer = 0, prefix = ????, length = 15
let v = 0x00000100_00000000_00000000_0000000fu128;
array.value_from_offset_view(OffsetView::from(&v)).unwrap();
}

#[test]
#[should_panic(
expected = "Invalid ByteView. Requested range 0..256 but buffer 0 valid range is 0..65"
)]
fn test_value_from_offset_view_invalid_too_long() {
let array = test_array();
// offset 0, buffer = 0, prefix = ????, length = 256
let v = 0x00000000_00000000_00000000_00000100u128;
array.value_from_offset_view(OffsetView::from(&v)).unwrap();
}

fn test_array() -> StringViewArray {
let mut builder = StringViewBuilder::new();
builder.append_value("hello");
builder.append_null();
builder.append_option(Some("large payload over 12 bytes"));
builder.append_option(Some("some other large payload over 12 bytes"));
builder.finish()
}

#[test]
fn test_append_string() {
// test builder append
Expand Down Expand Up @@ -620,8 +732,8 @@ mod tests {
view_buffer[0..4].copy_from_slice(&1u32.to_le_bytes());
view_buffer[4..].copy_from_slice(&data);

let view = ByteView::from(u128::from_le_bytes(view_buffer));
let views = ScalarBuffer::from(vec![view.into()]);
let view = u128::from_le_bytes(view_buffer);
let views = ScalarBuffer::from(vec![view]);
let buffers = vec![];
StringViewArray::new(views, buffers, None);
}
Expand All @@ -639,8 +751,8 @@ mod tests {
view_buffer[4..8].copy_from_slice(&input_str_1.as_bytes()[0..4]);
view_buffer[8..12].copy_from_slice(&0u32.to_le_bytes());
view_buffer[12..].copy_from_slice(&0u32.to_le_bytes());
let view = ByteView::from(u128::from_le_bytes(view_buffer));
let views = ScalarBuffer::from(vec![view.into()]);
let view = u128::from_le_bytes(view_buffer);
let views = ScalarBuffer::from(vec![view]);
let buffers = vec![Buffer::from_slice_ref(input_str_2.as_bytes())];

StringViewArray::new(views, buffers, None);
Expand Down
47 changes: 20 additions & 27 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::builder::ArrayBuilder;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use arrow_data::{OffsetViewBuilder, OwnedView};

use std::any::Any;
use std::marker::PhantomData;
Expand Down Expand Up @@ -72,35 +72,28 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
let v: &[u8] = value.as_ref().as_ref();
let length: u32 = v.len().try_into().unwrap();
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_builder.append(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
}

let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
let view: u128 = match OwnedView::from(v) {
OwnedView::Inline(view) => view,
OwnedView::Offset(view) => {
let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
}
};
let builder = OffsetViewBuilder::from(view)
.with_offset(self.in_progress.len() as u32)
.with_buffer_index(self.completed.len() as u32);
// copy the actual data into the in_progress buffer
self.in_progress.extend_from_slice(v);
builder.into()
}
};
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(v);

let view = ByteView {
length,
prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
buffer_index: self.completed.len() as u32,
offset,
};
self.views_builder.append(view.into());
self.views_builder.append(view);
self.null_buffer_builder.append_non_null();
}

Expand Down
Loading

0 comments on commit ffbf53b

Please sign in to comment.