-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow constructing ByteViewArray from existing blocks #5796
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -15,21 +15,42 @@ | |||
// specific language governing permissions and limitations | ||||
// under the License. | ||||
|
||||
use crate::builder::ArrayBuilder; | ||||
use crate::types::{BinaryViewType, ByteViewType, StringViewType}; | ||||
use crate::{ArrayRef, GenericByteViewArray}; | ||||
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; | ||||
use arrow_data::ByteView; | ||||
|
||||
use std::any::Any; | ||||
use std::marker::PhantomData; | ||||
use std::sync::Arc; | ||||
|
||||
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; | ||||
use arrow_data::ByteView; | ||||
use arrow_schema::ArrowError; | ||||
|
||||
use crate::{ArrayRef, GenericByteViewArray}; | ||||
use crate::builder::ArrayBuilder; | ||||
use crate::types::{BinaryViewType, ByteViewType, StringViewType}; | ||||
use crate::types::bytes::ByteArrayNativeType; | ||||
|
||||
const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; | ||||
|
||||
/// A builder for [`GenericByteViewArray`] | ||||
/// | ||||
/// See [`Self::append_value`] for the allocation strategy | ||||
/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data, | ||||
/// and a list of views into those buffers. | ||||
/// | ||||
/// This builder can be used in two ways | ||||
/// | ||||
/// # Append Values | ||||
/// | ||||
/// To avoid bump allocating this builder allocates data in fixed size blocks, configurable | ||||
/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`] | ||||
/// writes values larger than 12 bytes to the current in-progress block, with values smaller | ||||
/// than 12 bytes inlined into the views. If a value is appended that will not fit in the | ||||
/// in-progress block, it will be closed, and a new block of sufficient size allocated | ||||
/// | ||||
/// # Append Views | ||||
/// | ||||
/// Some use-cases may wish to reuse an existing allocation containing string data, for example, | ||||
/// when parsing data from a parquet data page. In such a case entire blocks can be appended | ||||
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended | ||||
/// using [`GenericByteViewBuilder::try_append_view`] | ||||
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> { | ||||
views_builder: BufferBuilder<u128>, | ||||
null_buffer_builder: NullBufferBuilder, | ||||
|
@@ -62,6 +83,98 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |||
Self { block_size, ..self } | ||||
} | ||||
|
||||
/// Append a new data block returning the new block offset | ||||
/// | ||||
/// Note: this will first flush any in-progress block | ||||
/// | ||||
/// This allows appending views from blocks added using [`Self::append_block`]. See | ||||
/// [`Self::append_value`] for appending individual values | ||||
/// | ||||
/// ``` | ||||
/// # use arrow_array::builder::StringViewBuilder; | ||||
/// let mut builder = StringViewBuilder::new(); | ||||
/// | ||||
/// let block = builder.append_block(b"helloworldbingobongo".into()); | ||||
/// | ||||
/// builder.try_append_view(block, 0, 5).unwrap(); | ||||
/// builder.try_append_view(block, 5, 5).unwrap(); | ||||
/// builder.try_append_view(block, 10, 5).unwrap(); | ||||
/// builder.try_append_view(block, 15, 5).unwrap(); | ||||
/// builder.try_append_view(block, 0, 15).unwrap(); | ||||
/// let array = builder.finish(); | ||||
/// | ||||
/// let actual: Vec<_> = array.iter().flatten().collect(); | ||||
/// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"]; | ||||
/// assert_eq!(actual, expected); | ||||
/// ``` | ||||
pub fn append_block(&mut self, buffer: Buffer) -> u32 { | ||||
assert!(buffer.len() < u32::MAX as usize); | ||||
|
||||
self.flush_in_progress(); | ||||
let offset = self.completed.len(); | ||||
self.push_completed(buffer); | ||||
offset as u32 | ||||
} | ||||
|
||||
/// Try to append a view of the given `block`, `offset` and `length` | ||||
/// | ||||
/// See [`Self::append_block`] | ||||
pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what performance impact the validation logic here will have, but we can always add an unchecked version down the line should it become a problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like we have a filter benchmark but not a raw array creation speed benchmark arrow-rs/arrow/src/util/bench_util.rs Line 141 in 9828bf0
I agree let's start like this and then add benchmarks (like reading from parquet) and if they show slow downs we can add unchecked versions |
||||
let b = self.completed.get(block as usize).ok_or_else(|| { | ||||
ArrowError::InvalidArgumentError(format!("No block found with index {block}")) | ||||
})?; | ||||
let start = offset as usize; | ||||
let end = start.saturating_add(len as usize); | ||||
|
||||
let b = b.get(start..end).ok_or_else(|| { | ||||
ArrowError::InvalidArgumentError(format!( | ||||
"Range {start}..{end} out of bounds for block of length {}", | ||||
b.len() | ||||
)) | ||||
})?; | ||||
|
||||
if T::Native::from_bytes_checked(b).is_none() { | ||||
return Err(ArrowError::InvalidArgumentError( | ||||
"Invalid view data".to_string(), | ||||
)); | ||||
} | ||||
|
||||
if len <= 12 { | ||||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
let mut view_buffer = [0; 16]; | ||||
view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); | ||||
view_buffer[4..4 + b.len()].copy_from_slice(b); | ||||
self.views_builder.append(u128::from_le_bytes(view_buffer)); | ||||
} else { | ||||
let view = ByteView { | ||||
length: len, | ||||
prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), | ||||
buffer_index: block, | ||||
offset, | ||||
}; | ||||
self.views_builder.append(view.into()); | ||||
} | ||||
|
||||
self.null_buffer_builder.append_non_null(); | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Flushes the in progress block if any | ||||
#[inline] | ||||
fn flush_in_progress(&mut self) { | ||||
if !self.in_progress.is_empty() { | ||||
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress)); | ||||
self.push_completed(f) | ||||
} | ||||
} | ||||
|
||||
/// Append a block to `self.completed`, checking for overflow | ||||
#[inline] | ||||
fn push_completed(&mut self, block: Buffer) { | ||||
assert!(block.len() < u32::MAX as usize, "Block too large"); | ||||
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks"); | ||||
self.completed.push(block); | ||||
} | ||||
|
||||
/// Appends a value into the builder | ||||
/// | ||||
/// # Panics | ||||
|
@@ -84,12 +197,9 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |||
|
||||
let required_cap = self.in_progress.len() + v.len(); | ||||
if self.in_progress.capacity() < required_cap { | ||||
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); | ||||
let flushed = std::mem::replace(&mut self.in_progress, in_progress); | ||||
if !flushed.is_empty() { | ||||
assert!(self.completed.len() < u32::MAX as usize); | ||||
self.completed.push(flushed.into()); | ||||
} | ||||
self.flush_in_progress(); | ||||
let to_reserve = v.len().max(self.block_size as usize); | ||||
self.in_progress.reserve(to_reserve); | ||||
}; | ||||
let offset = self.in_progress.len() as u32; | ||||
self.in_progress.extend_from_slice(v); | ||||
|
@@ -122,10 +232,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |||
|
||||
/// Builds the [`GenericByteViewArray`] and reset this builder | ||||
pub fn finish(&mut self) -> GenericByteViewArray<T> { | ||||
let mut completed = std::mem::take(&mut self.completed); | ||||
if !self.in_progress.is_empty() { | ||||
completed.push(std::mem::take(&mut self.in_progress).into()); | ||||
} | ||||
self.flush_in_progress(); | ||||
let completed = std::mem::take(&mut self.completed); | ||||
let len = self.views_builder.len(); | ||||
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); | ||||
let nulls = self.null_buffer_builder.finish(); | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.