-
Notifications
You must be signed in to change notification settings - Fork 852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow constructing ByteViewArray from existing blocks #5796
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,21 +15,42 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::builder::ArrayBuilder; | ||
use crate::types::{BinaryViewType, ByteViewType, StringViewType}; | ||
use crate::{ArrayRef, GenericByteViewArray}; | ||
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; | ||
use arrow_data::ByteView; | ||
|
||
use std::any::Any; | ||
use std::marker::PhantomData; | ||
use std::sync::Arc; | ||
|
||
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; | ||
use arrow_data::ByteView; | ||
use arrow_schema::ArrowError; | ||
|
||
use crate::builder::ArrayBuilder; | ||
use crate::types::bytes::ByteArrayNativeType; | ||
use crate::types::{BinaryViewType, ByteViewType, StringViewType}; | ||
use crate::{ArrayRef, GenericByteViewArray}; | ||
|
||
const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; | ||
|
||
/// A builder for [`GenericByteViewArray`] | ||
/// | ||
/// See [`Self::append_value`] for the allocation strategy | ||
/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data, | ||
/// and a list of views into those buffers. | ||
/// | ||
/// This builder can be used in two ways | ||
/// | ||
/// # Append Values | ||
/// | ||
/// To avoid bump allocating, this builder allocates data in fixed size blocks, configurable | ||
/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`] | ||
/// writes values larger than 12 bytes to the current in-progress block, with values smaller | ||
/// than 12 bytes inlined into the views. If a value is appended that will not fit in the | ||
/// in-progress block, it will be closed, and a new block of sufficient size allocated | ||
/// | ||
/// # Append Views | ||
/// | ||
/// Some use-cases may wish to reuse an existing allocation containing string data, for example, | ||
/// when parsing data from a parquet data page. In such a case entire blocks can be appended | ||
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended | ||
/// using [`GenericByteViewBuilder::try_append_view`] | ||
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> { | ||
views_builder: BufferBuilder<u128>, | ||
null_buffer_builder: NullBufferBuilder, | ||
|
@@ -62,6 +83,98 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |
Self { block_size, ..self } | ||
} | ||
|
||
/// Append a new data block returning the new block offset | ||
/// | ||
/// Note: this will first flush any in-progress block | ||
/// | ||
/// This allows appending views from blocks added using [`Self::append_block`]. See | ||
/// [`Self::append_value`] for appending individual values | ||
/// | ||
/// ``` | ||
/// # use arrow_array::builder::StringViewBuilder; | ||
/// let mut builder = StringViewBuilder::new(); | ||
/// | ||
/// let block = builder.append_block(b"helloworldbingobongo".into()); | ||
/// | ||
/// builder.try_append_view(block, 0, 5).unwrap(); | ||
/// builder.try_append_view(block, 5, 5).unwrap(); | ||
/// builder.try_append_view(block, 10, 5).unwrap(); | ||
/// builder.try_append_view(block, 15, 5).unwrap(); | ||
/// builder.try_append_view(block, 0, 15).unwrap(); | ||
/// let array = builder.finish(); | ||
/// | ||
/// let actual: Vec<_> = array.iter().flatten().collect(); | ||
/// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"]; | ||
/// assert_eq!(actual, expected); | ||
/// ``` | ||
pub fn append_block(&mut self, buffer: Buffer) -> u32 { | ||
assert!(buffer.len() < u32::MAX as usize); | ||
|
||
self.flush_in_progress(); | ||
let offset = self.completed.len(); | ||
self.push_completed(buffer); | ||
offset as u32 | ||
} | ||
|
||
/// Try to append a view of the given `block`, `offset` and `length` | ||
/// | ||
/// See [`Self::append_block`] | ||
pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> { | ||
let b = self.completed.get(block as usize).ok_or_else(|| { | ||
ArrowError::InvalidArgumentError(format!("No block found with index {block}")) | ||
})?; | ||
let start = offset as usize; | ||
let end = start.saturating_add(len as usize); | ||
|
||
let b = b.get(start..end).ok_or_else(|| { | ||
ArrowError::InvalidArgumentError(format!( | ||
"Range {start}..{end} out of bounds for block of length {}", | ||
b.len() | ||
)) | ||
})?; | ||
|
||
if T::Native::from_bytes_checked(b).is_none() { | ||
return Err(ArrowError::InvalidArgumentError( | ||
"Invalid view data".to_string(), | ||
)); | ||
} | ||
|
||
if len <= 12 { | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut view_buffer = [0; 16]; | ||
view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); | ||
view_buffer[4..4 + b.len()].copy_from_slice(b); | ||
self.views_builder.append(u128::from_le_bytes(view_buffer)); | ||
} else { | ||
let view = ByteView { | ||
length: len, | ||
prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), | ||
buffer_index: block, | ||
offset, | ||
}; | ||
self.views_builder.append(view.into()); | ||
} | ||
|
||
self.null_buffer_builder.append_non_null(); | ||
Ok(()) | ||
} | ||
|
||
/// Flushes the in progress block if any | ||
#[inline] | ||
fn flush_in_progress(&mut self) { | ||
if !self.in_progress.is_empty() { | ||
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress)); | ||
self.push_completed(f) | ||
} | ||
} | ||
|
||
/// Append a block to `self.completed`, checking for overflow | ||
#[inline] | ||
fn push_completed(&mut self, block: Buffer) { | ||
assert!(block.len() < u32::MAX as usize, "Block too large"); | ||
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks"); | ||
self.completed.push(block); | ||
} | ||
|
||
/// Appends a value into the builder | ||
/// | ||
/// # Panics | ||
|
@@ -84,12 +197,9 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |
|
||
let required_cap = self.in_progress.len() + v.len(); | ||
if self.in_progress.capacity() < required_cap { | ||
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); | ||
let flushed = std::mem::replace(&mut self.in_progress, in_progress); | ||
if !flushed.is_empty() { | ||
assert!(self.completed.len() < u32::MAX as usize); | ||
self.completed.push(flushed.into()); | ||
} | ||
self.flush_in_progress(); | ||
let to_reserve = v.len().max(self.block_size as usize); | ||
self.in_progress.reserve(to_reserve); | ||
}; | ||
let offset = self.in_progress.len() as u32; | ||
self.in_progress.extend_from_slice(v); | ||
|
@@ -122,10 +232,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> { | |
|
||
/// Builds the [`GenericByteViewArray`] and reset this builder | ||
pub fn finish(&mut self) -> GenericByteViewArray<T> { | ||
let mut completed = std::mem::take(&mut self.completed); | ||
if !self.in_progress.is_empty() { | ||
completed.push(std::mem::take(&mut self.in_progress).into()); | ||
} | ||
self.flush_in_progress(); | ||
let completed = std::mem::take(&mut self.completed); | ||
let len = self.views_builder.len(); | ||
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); | ||
let nulls = self.null_buffer_builder.finish(); | ||
|
@@ -219,3 +327,88 @@ pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>; | |
/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with | ||
/// [`GenericByteViewBuilder::append_null`] as normal. | ||
pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>; | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::Array; | ||
|
||
#[test] | ||
fn test_string_view() { | ||
let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81"); | ||
let b2 = Buffer::from(b"cupcakes"); | ||
let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity"); | ||
|
||
let mut v = StringViewBuilder::new(); | ||
assert_eq!(v.append_block(b1), 0); | ||
|
||
v.append_value("This is a very long string that exceeds the inline length"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These values are appended to the current block ( |
||
v.append_value("This is another very long string that exceeds the inline length"); | ||
|
||
assert_eq!(v.append_block(b2), 2); | ||
assert_eq!(v.append_block(b3), 3); | ||
|
||
// Test short strings | ||
v.try_append_view(0, 0, 5).unwrap(); // world | ||
v.try_append_view(0, 6, 7).unwrap(); // bananas | ||
v.try_append_view(2, 3, 5).unwrap(); // cake | ||
v.try_append_view(2, 0, 3).unwrap(); // cup | ||
v.try_append_view(2, 0, 8).unwrap(); // cupcakes | ||
v.try_append_view(0, 13, 4).unwrap(); // 😁 | ||
v.try_append_view(0, 13, 0).unwrap(); // | ||
|
||
// Test longer strings | ||
v.try_append_view(3, 0, 16).unwrap(); // Many strings are | ||
v.try_append_view(1, 0, 19).unwrap(); // This is a very long | ||
v.try_append_view(3, 13, 27).unwrap(); // here contained of great length | ||
|
||
v.append_value("I do so like long strings"); | ||
|
||
let array = v.finish_cloned(); | ||
array.to_data().validate_full().unwrap(); | ||
assert_eq!(array.data_buffers().len(), 5); | ||
let actual: Vec<_> = array.iter().map(Option::unwrap).collect(); | ||
assert_eq!( | ||
actual, | ||
&[ | ||
"This is a very long string that exceeds the inline length", | ||
"This is another very long string that exceeds the inline length", | ||
"world", | ||
"bananas", | ||
"cakes", | ||
"cup", | ||
"cupcakes", | ||
"😁", | ||
"", | ||
"Many strings are", | ||
"This is a very long", | ||
"are here contained of great", | ||
"I do so like long strings" | ||
] | ||
); | ||
|
||
let err = v.try_append_view(0, u32::MAX, 1).unwrap_err(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please also add an error test for an invalid block ID? (aka "No block found with index {block}") |
||
assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"); | ||
|
||
let err = v.try_append_view(0, 1, u32::MAX).unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Invalid argument error: Range 1..4294967296 out of bounds for block of length 17" | ||
); | ||
|
||
let err = v.try_append_view(0, 13, 2).unwrap_err(); | ||
assert_eq!(err.to_string(), "Invalid argument error: Invalid view data"); | ||
|
||
let err = v.try_append_view(0, 40, 0).unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Invalid argument error: Range 40..40 out of bounds for block of length 17" | ||
); | ||
|
||
let err = v.try_append_view(5, 0, 0).unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Invalid argument error: No block found with index 5" | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what performance impact the validation logic here will have, but we can always add an unchecked version down the line should it become a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we have a filter benchmark but not a raw array creation speed benchmark
arrow-rs/arrow/src/util/bench_util.rs
Line 141 in 9828bf0
I agree let's start like this and then add benchmarks (like reading from parquet) and if they show slow downs we can add unchecked versions