Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into alamb/op_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 30, 2024
2 parents a15113e + 1634a65 commit f73ea95
Show file tree
Hide file tree
Showing 20 changed files with 1,041 additions and 335 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ Welcome to the [Rust][rust] implementation of [Apache Arrow], the popular in-mem

This repo contains the following main components:

| Crate | Description | Latest API Docs | README |
| ---------------- | --------------------------------------------------------- | ---------------------------------------------- | ------------------------------ |
| [`arrow`] | Core Arrow functionality (memory layout, arrays, kernels) | [docs.rs](https://docs.rs/arrow/latest) | [(README)][arrow-readme] |
| [`parquet`] | Parquet columnar file format | [docs.rs](https://docs.rs/parquet/latest) | [(README)][parquet-readme] |
| [`arrow-flight`] | Arrow-Flight IPC protocol | [docs.rs](https://docs.rs/arrow-flight/latest) | [(README)][flight-readme] |
| [`object-store`] | object store (aws, azure, gcp, local, in-memory) | [docs.rs](https://docs.rs/object_store/latest) | [(README)][objectstore-readme] |
| Crate | Description | Latest API Docs | README |
| ------------------ | ---------------------------------------------------------------------------- | ------------------------------------------------ | --------------------------------- |
| [`arrow`] | Core functionality (memory layout, arrays, low level computations) | [docs.rs](https://docs.rs/arrow/latest) | [(README)][arrow-readme] |
| [`arrow-flight`] | Support for Arrow-Flight IPC protocol | [docs.rs](https://docs.rs/arrow-flight/latest) | [(README)][flight-readme] |
| [`object-store`] | Support for object store interactions (aws, azure, gcp, local, in-memory) | [docs.rs](https://docs.rs/object_store/latest) | [(README)][objectstore-readme] |
| [`parquet`] | Support for Parquet columnar file format | [docs.rs](https://docs.rs/parquet/latest) | [(README)][parquet-readme] |
| [`parquet_derive`] | A crate for deriving RecordWriter/RecordReader for arbitrary, simple structs | [docs.rs](https://docs.rs/parquet-derive/latest) | [(README)][parquet-derive-readme] |

The current development version the API documentation in this repo can be found [here](https://arrow.apache.org/rust).

[apache arrow]: https://arrow.apache.org/
[`arrow`]: https://crates.io/crates/arrow
[`parquet`]: https://crates.io/crates/parquet
[`parquet-derive`]: https://crates.io/crates/parquet-derive
[`parquet_derive`]: https://crates.io/crates/parquet-derive
[`arrow-flight`]: https://crates.io/crates/arrow-flight
[`object-store`]: https://crates.io/crates/object-store

Expand Down Expand Up @@ -127,5 +128,6 @@ There is more information in the [contributing] guide.
[datafusion-readme]: https://github.com/apache/datafusion/blob/main/README.md
[ballista-readme]: https://github.com/apache/datafusion-ballista/blob/main/README.md
[objectstore-readme]: object_store/README.md
[parquet-derive-readme]: parquet_derive/README.md
[issues]: https://github.com/apache/arrow-rs/issues
[discussions]: https://github.com/apache/arrow-rs/discussions
227 changes: 210 additions & 17 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,42 @@
// specific language governing permissions and limitations
// under the License.

use crate::builder::ArrayBuilder;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;

use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;

use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use arrow_schema::ArrowError;

use crate::builder::ArrayBuilder;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};

const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;

/// A builder for [`GenericByteViewArray`]
///
/// See [`Self::append_value`] for the allocation strategy
/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data,
/// and a list of views into those buffers.
///
/// This builder can be used in two ways
///
/// # Append Values
///
/// To avoid bump allocating, this builder allocates data in fixed size blocks, configurable
/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`]
/// writes values larger than 12 bytes to the current in-progress block, with values smaller
/// than 12 bytes inlined into the views. If a value is appended that will not fit in the
/// in-progress block, it will be closed, and a new block of sufficient size allocated
///
/// # Append Views
///
/// Some use-cases may wish to reuse an existing allocation containing string data, for example,
/// when parsing data from a parquet data page. In such a case entire blocks can be appended
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
/// using [`GenericByteViewBuilder::try_append_view`]
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
views_builder: BufferBuilder<u128>,
null_buffer_builder: NullBufferBuilder,
Expand Down Expand Up @@ -62,6 +83,98 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
Self { block_size, ..self }
}

/// Append a new data block returning the new block offset
///
/// Note: this will first flush any in-progress block
///
/// This allows appending views from blocks added using [`Self::append_block`]. See
/// [`Self::append_value`] for appending individual values
///
/// ```
/// # use arrow_array::builder::StringViewBuilder;
/// let mut builder = StringViewBuilder::new();
///
/// let block = builder.append_block(b"helloworldbingobongo".into());
///
/// builder.try_append_view(block, 0, 5).unwrap();
/// builder.try_append_view(block, 5, 5).unwrap();
/// builder.try_append_view(block, 10, 5).unwrap();
/// builder.try_append_view(block, 15, 5).unwrap();
/// builder.try_append_view(block, 0, 15).unwrap();
/// let array = builder.finish();
///
/// let actual: Vec<_> = array.iter().flatten().collect();
/// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"];
/// assert_eq!(actual, expected);
/// ```
pub fn append_block(&mut self, buffer: Buffer) -> u32 {
assert!(buffer.len() < u32::MAX as usize);

self.flush_in_progress();
let offset = self.completed.len();
self.push_completed(buffer);
offset as u32
}

/// Try to append a view of the given `block`, `offset` and `length`
///
/// See [`Self::append_block`]
pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
let b = self.completed.get(block as usize).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
})?;
let start = offset as usize;
let end = start.saturating_add(len as usize);

let b = b.get(start..end).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Range {start}..{end} out of bounds for block of length {}",
b.len()
))
})?;

if T::Native::from_bytes_checked(b).is_none() {
return Err(ArrowError::InvalidArgumentError(
"Invalid view data".to_string(),
));
}

if len <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&len.to_le_bytes());
view_buffer[4..4 + b.len()].copy_from_slice(b);
self.views_builder.append(u128::from_le_bytes(view_buffer));
} else {
let view = ByteView {
length: len,
prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()),
buffer_index: block,
offset,
};
self.views_builder.append(view.into());
}

self.null_buffer_builder.append_non_null();
Ok(())
}

/// Flushes the in progress block if any
#[inline]
fn flush_in_progress(&mut self) {
if !self.in_progress.is_empty() {
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
self.push_completed(f)
}
}

/// Append a block to `self.completed`, checking for overflow
#[inline]
fn push_completed(&mut self, block: Buffer) {
assert!(block.len() < u32::MAX as usize, "Block too large");
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
self.completed.push(block);
}

/// Appends a value into the builder
///
/// # Panics
Expand All @@ -84,12 +197,9 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {

let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
}
self.flush_in_progress();
let to_reserve = v.len().max(self.block_size as usize);
self.in_progress.reserve(to_reserve);
};
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(v);
Expand Down Expand Up @@ -122,10 +232,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {

/// Builds the [`GenericByteViewArray`] and reset this builder
pub fn finish(&mut self) -> GenericByteViewArray<T> {
let mut completed = std::mem::take(&mut self.completed);
if !self.in_progress.is_empty() {
completed.push(std::mem::take(&mut self.in_progress).into());
}
self.flush_in_progress();
let completed = std::mem::take(&mut self.completed);
let len = self.views_builder.len();
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
let nulls = self.null_buffer_builder.finish();
Expand Down Expand Up @@ -219,3 +327,88 @@ pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with
/// [`GenericByteViewBuilder::append_null`] as normal.
pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;

#[cfg(test)]
mod tests {
use super::*;
use crate::Array;

#[test]
fn test_string_view() {
let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
let b2 = Buffer::from(b"cupcakes");
let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");

let mut v = StringViewBuilder::new();
assert_eq!(v.append_block(b1), 0);

v.append_value("This is a very long string that exceeds the inline length");
v.append_value("This is another very long string that exceeds the inline length");

assert_eq!(v.append_block(b2), 2);
assert_eq!(v.append_block(b3), 3);

// Test short strings
v.try_append_view(0, 0, 5).unwrap(); // world
v.try_append_view(0, 6, 7).unwrap(); // bananas
v.try_append_view(2, 3, 5).unwrap(); // cake
v.try_append_view(2, 0, 3).unwrap(); // cup
v.try_append_view(2, 0, 8).unwrap(); // cupcakes
v.try_append_view(0, 13, 4).unwrap(); // 😁
v.try_append_view(0, 13, 0).unwrap(); //

// Test longer strings
v.try_append_view(3, 0, 16).unwrap(); // Many strings are
v.try_append_view(1, 0, 19).unwrap(); // This is a very long
v.try_append_view(3, 13, 27).unwrap(); // here contained of great length

v.append_value("I do so like long strings");

let array = v.finish_cloned();
array.to_data().validate_full().unwrap();
assert_eq!(array.data_buffers().len(), 5);
let actual: Vec<_> = array.iter().map(Option::unwrap).collect();
assert_eq!(
actual,
&[
"This is a very long string that exceeds the inline length",
"This is another very long string that exceeds the inline length",
"world",
"bananas",
"cakes",
"cup",
"cupcakes",
"😁",
"",
"Many strings are",
"This is a very long",
"are here contained of great",
"I do so like long strings"
]
);

let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");

let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
);

let err = v.try_append_view(0, 13, 2).unwrap_err();
assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");

let err = v.try_append_view(0, 40, 0).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Range 40..40 out of bounds for block of length 17"
);

let err = v.try_append_view(5, 0, 0).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: No block found with index 5"
);
}
}
12 changes: 12 additions & 0 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,20 +1388,32 @@ pub(crate) mod bytes {
impl<O: OffsetSizeTrait> ByteArrayTypeSealed for GenericBinaryType<O> {}

pub trait ByteArrayNativeType: std::fmt::Debug + Send + Sync {
fn from_bytes_checked(b: &[u8]) -> Option<&Self>;

/// # Safety
///
/// `b` must be a valid byte sequence for `Self`
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self;
}

impl ByteArrayNativeType for [u8] {
#[inline]
fn from_bytes_checked(b: &[u8]) -> Option<&Self> {
Some(b)
}

#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
b
}
}

impl ByteArrayNativeType for str {
#[inline]
fn from_bytes_checked(b: &[u8]) -> Option<&Self> {
std::str::from_utf8(b).ok()
}

#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
std::str::from_utf8_unchecked(b)
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ publish = false
[dependencies]
# Pin specific version of the tonic-build dependencies to avoid auto-generated
# (and checked in) arrow.flight.protocol.rs from changing
proc-macro2 = { version = "=1.0.83", default-features = false }
proc-macro2 = { version = "=1.0.84", default-features = false }
prost-build = { version = "=0.12.6", default-features = false }
tonic-build = { version = "=0.11.0", default-features = false, features = ["transport", "prost"] }
10 changes: 5 additions & 5 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
// 4 bytes - the byte length of the payload
// a flatbuffer Message whose header is the Schema
if buffer.len() >= 4 {
// check continuation maker
let continuation_maker = &buffer[0..4];
let begin_offset: usize = if continuation_maker.eq(&CONTINUATION_MARKER) {
// check continuation marker
let continuation_marker = &buffer[0..4];
let begin_offset: usize = if continuation_marker.eq(&CONTINUATION_MARKER) {
// 4 bytes: CONTINUATION_MARKER
// 4 bytes: length
// buffer
4
} else {
// backward compatibility for buffer without the continuation maker
// backward compatibility for buffer without the continuation marker
// 4 bytes: length
// buffer
0
Expand All @@ -198,7 +198,7 @@ pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
Ok(fb_to_schema(ipc_schema))
} else {
Err(ArrowError::ParseError(
"The buffer length is less than 4 and missing the continuation maker or length of buffer".to_string()
"The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
))
}
}
Expand Down
Loading

0 comments on commit f73ea95

Please sign in to comment.