Skip to content

Commit

Permalink
Encapsulate View manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 6, 2024
1 parent 520ad68 commit 653065b
Show file tree
Hide file tree
Showing 5 changed files with 718 additions and 87 deletions.
153 changes: 103 additions & 50 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,36 @@ use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayAccessor, ArrayRef};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_data::{ArrayData, ArrayDataBuilder, OffsetView, View};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

/// [Variable-size Binary View Layout]: An array of variable length bytes view arrays.
///
/// Different than [`crate::GenericByteArray`] as it stores both an offset and length
/// meaning that take / filter operations can be implemented without copying the underlying data.
///
/// See [`StringViewArray`] for storing utf8 encoded string data and
/// [`BinaryViewArray`] for storing bytes.
///
/// [Variable-size Binary View Layout]: https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
/// [Variable-size Binary View Layout]: An array of variable length byte strings.
///
/// A `GenericByteViewArray` stores variable length byte strings. An array of
/// `N` elements is stored as `N` fixed length "views" and a variable number
/// `N` elements is stored as `N` fixed length [`View`]s and some number
/// of variable length "buffers".
///
/// Each view is a `u128` value layout is different depending on the
/// length of the string stored at that location:
/// There are no constraints on offsets other than they must point into a valid
/// buffer. The offsets can be out of order, non-continuous and overlapping.
///
/// ```text
/// ┌──────┬────────────────────────┐
/// │length│ string value │
/// Strings (len <= 12) │ │ (padded with 0) │
/// └──────┴────────────────────────┘
/// 0 31 127
///
/// ┌───────┬───────┬───────┬───────┐
/// │length │prefix │ buf │offset │
/// Strings (len > 12) │ │ │ index │ │
/// └───────┴───────┴───────┴───────┘
/// 0 31 63 95 127
/// ```
/// Because `GenericByteViewArray` stores both an offset and length for each
/// byte string, certain operations such as `take` and `filter` can be
/// implemented without copying the underlying data, unlike
/// [`GenericByteArray`], which requires the variable length data to be
/// contiguous.
///
/// * Strings with length <= 12 are stored directly in the view.
/// # See Also:
/// * [`StringViewArray`] for storing UTF-8 string data
/// * [`BinaryViewArray`] for storing bytes
/// * [`View`] for the format of the views and interpreting the `u128` views
///
/// * Strings with length > 12: The first four bytes are stored inline in the
/// view and the entire string is stored in one of the buffers.
/// [Variable-size Binary View Layout]: https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
///
/// Unlike [`GenericByteArray`], there are no constraints on the offsets other
/// than they must point into a valid buffer. However, they can be out of order,
/// non continuous and overlapping.
/// # Example
///
/// For example, in the following diagram, the strings "FishWasInTownToday" and
/// "CrumpleFacedFish" are both longer than 12 bytes and thus are stored in a
Expand All @@ -93,6 +77,7 @@ use std::sync::Arc;
/// └───┘
/// ```
/// [`GenericByteArray`]: crate::array::GenericByteArray
/// [`View`]: arrow_data::View
pub struct GenericByteViewArray<T: ByteViewType + ?Sized> {
data_type: DataType,
views: ScalarBuffer<u128>,
Expand All @@ -114,16 +99,26 @@ impl<T: ByteViewType + ?Sized> Clone for GenericByteViewArray<T> {
}

impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// Create a new [`GenericByteViewArray`] from the provided parts, panicking on failure
/// Create a new [`GenericByteViewArray`] from the provided parts, panicking
/// on failure.
///
/// # Panics
/// See [Self::try_new] for parameters
///
/// # Panics
/// Panics if [`GenericByteViewArray::try_new`] returns an error
///
/// [`View`]: arrow_data::View
pub fn new(views: ScalarBuffer<u128>, buffers: Vec<Buffer>, nulls: Option<NullBuffer>) -> Self {
Self::try_new(views, buffers, nulls).unwrap()
}

/// Create a new [`GenericByteViewArray`] from the provided parts, returning an error on failure
/// Create a new [`GenericByteViewArray`] from the provided parts, returning
/// an error on failure
///
/// # Parameters
/// * `views`: a [`ScalarBuffer`] of u128 views (see [`View`] for format)
/// * `buffers`: a vector of [`Buffer`]s storing the string data
/// * `nulls`: an optional [`NullBuffer`] for null values
///
/// # Errors
///
Expand Down Expand Up @@ -156,7 +151,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
})
}

/// Create a new [`GenericByteViewArray`] from the provided parts, without validation
/// Create a new [`GenericByteViewArray`] from the provided parts, without
/// validation
///
/// See [Self::try_new] for parameters
///
/// # Safety
///
Expand Down Expand Up @@ -232,21 +230,76 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
unsafe { self.value_unchecked(i) }
}

/// return the view at index `i`
pub fn view(&self, _i: usize) -> &View {
// TODO
// Need to do this so a reference to the view can be returned as bytes_unchecked
todo!();
}

/// Returns the element at index `i`
///
/// # Safety
/// Caller is responsible for ensuring that the index is within the bounds of the array
pub unsafe fn value_unchecked(&self, idx: usize) -> &T::Native {
let v = self.views.get_unchecked(idx);
let len = *v as u32;
let b = if len <= 12 {
let ptr = self.views.as_ptr() as *const u8;
std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize)
} else {
let view = ByteView::from(*v);
let data = self.buffers.get_unchecked(view.buffer_index as usize);
let offset = view.offset as usize;
data.get_unchecked(offset..offset + len as usize)
};
match View::from(v) {
View::Inline(inline_view) => {
let bytes = inline_view.get_bytes_unchecked(v);
T::Native::from_bytes_unchecked(bytes)
}
View::Offset(offset_view) => self.value_from_offset_view_unchecked(offset_view),
}
}

/// Return the value of element from this [`OffsetView`]
///
/// # Errors
/// * the buffer index is out of bounds
///* offset / length is out of bounds of the buffer
/// * The data is not valid for `T::Native` (e.g. not Utf8)
pub fn value_from_offset_view<'a>(
&'a self,
offset_view: OffsetView<'_>,
) -> Result<&'a T::Native, ArrowError> {
let data = self
.buffers
.get(offset_view.buffer_index() as usize)
.ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Invalid ByteView. Requested buffer {} but only has {} buffers",
offset_view.buffer_index(),
self.buffers.len()
))
})?;

let b = data.get(offset_view.range()).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Invalid ByteView. Requested range {:?} but buffer {} only has {} bytes",
offset_view.range(),
offset_view.buffer_index(),
data.len()
))
})?;

T::Native::try_from_bytes(b)
}

/// Return the value from the [`OffsetView`]
///
/// # Safety
/// The caller is responsible for ensuring:
/// * the buffer index is within of bounds
/// * offset / length is within of bounds of the buffer
/// * The data is valid for `T::Native` (e.g Utf8 for Strings)
pub unsafe fn value_from_offset_view_unchecked<'a>(
&'a self,
offset_view: OffsetView<'_>,
) -> &'a T::Native {
let data = self
.buffers
.get_unchecked(offset_view.buffer_index() as usize);
let b = data.get_unchecked(offset_view.range());
T::Native::from_bytes_unchecked(b)
}

Expand Down Expand Up @@ -620,8 +673,8 @@ mod tests {
view_buffer[0..4].copy_from_slice(&1u32.to_le_bytes());
view_buffer[4..].copy_from_slice(&data);

let view = ByteView::from(u128::from_le_bytes(view_buffer));
let views = ScalarBuffer::from(vec![view.into()]);
let view = u128::from_le_bytes(view_buffer);
let views = ScalarBuffer::from(vec![view]);
let buffers = vec![];
StringViewArray::new(views, buffers, None);
}
Expand All @@ -639,8 +692,8 @@ mod tests {
view_buffer[4..8].copy_from_slice(&input_str_1.as_bytes()[0..4]);
view_buffer[8..12].copy_from_slice(&0u32.to_le_bytes());
view_buffer[12..].copy_from_slice(&0u32.to_le_bytes());
let view = ByteView::from(u128::from_le_bytes(view_buffer));
let views = ScalarBuffer::from(vec![view.into()]);
let view = u128::from_le_bytes(view_buffer);
let views = ScalarBuffer::from(vec![view]);
let buffers = vec![Buffer::from_slice_ref(input_str_2.as_bytes())];

StringViewArray::new(views, buffers, None);
Expand Down
47 changes: 20 additions & 27 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::builder::ArrayBuilder;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use arrow_data::{OffsetViewBuilder, OwnedView};

use std::any::Any;
use std::marker::PhantomData;
Expand Down Expand Up @@ -72,35 +72,28 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
let v: &[u8] = value.as_ref().as_ref();
let length: u32 = v.len().try_into().unwrap();
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_builder.append(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
}

let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
let view: u128 = match OwnedView::from(v) {
OwnedView::Inline(view) => view,
OwnedView::Offset(view) => {
let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
}
};
let builder = OffsetViewBuilder::from(view)
.with_offset(self.in_progress.len() as u32)
.with_buffer_index(self.completed.len() as u32);
// copy the actual data into the in_progress buffer
self.in_progress.extend_from_slice(v);
builder.into()
}
};
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(v);

let view = ByteView {
length,
prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
buffer_index: self.completed.len() as u32,
offset,
};
self.views_builder.append(view.into());
self.views_builder.append(view);
self.null_buffer_builder.append_non_null();
}

Expand Down
21 changes: 21 additions & 0 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1425,24 +1425,45 @@ pub(crate) mod bytes {
impl<O: OffsetSizeTrait> ByteArrayTypeSealed for GenericBinaryType<O> {}

pub trait ByteArrayNativeType: std::fmt::Debug + Send + Sync {
/// Covert bytes to this native type
///
/// # Safety
///
/// `b` must be a valid byte sequence for `Self`
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self;

/// Covert bytes to this native type
///
/// # Errors
///
/// `b` is not a valid byte sequence for `Self` (e.g. not UTF8)
fn try_from_bytes(b: &[u8]) -> Result<&Self, ArrowError>;
}

impl ByteArrayNativeType for [u8] {
#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
b
}

#[inline]
fn try_from_bytes(b: &[u8]) -> Result<&Self, ArrowError> {
Ok(b)
}
}

impl ByteArrayNativeType for str {
#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
std::str::from_utf8_unchecked(b)
}

#[inline]
fn try_from_bytes(b: &[u8]) -> Result<&Self, ArrowError> {
std::str::from_utf8(b).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Encountered non UTF-8 data: {e}"))
})
}
}
}

Expand Down
Loading

0 comments on commit 653065b

Please sign in to comment.