Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into XiangpengHao/master
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Mar 31, 2024
2 parents 0ffd783 + 9f36c88 commit 448bc9e
Show file tree
Hide file tree
Showing 59 changed files with 2,803 additions and 1,970 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ jobs:

- name: Setup LocalStack (AWS emulation)
run: |
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.2.0)" >> $GITHUB_ENV
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Expand Down
4 changes: 4 additions & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"

[[bench]]
name = "i256"
harness = false

[[bench]]
name = "offset"
harness = false
49 changes: 49 additions & 0 deletions arrow-buffer/benches/offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_buffer::{OffsetBuffer, OffsetBufferBuilder};
use criterion::*;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

const SIZE: usize = 1024;

fn criterion_benchmark(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(42);
let lengths: Vec<usize> = black_box((0..SIZE).map(|_| rng.gen_range(0..40)).collect());

c.bench_function("OffsetBuffer::from_lengths", |b| {
b.iter(|| OffsetBuffer::<i32>::from_lengths(lengths.iter().copied()));
});

c.bench_function("OffsetBufferBuilder::push_length", |b| {
b.iter(|| {
let mut builder = OffsetBufferBuilder::<i32>::new(lengths.len());
lengths.iter().for_each(|x| builder.push_length(*x));
builder.finish()
});
});

let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied()).into_inner();

c.bench_function("OffsetBuffer::new", |b| {
b.iter(|| OffsetBuffer::new(black_box(offsets.clone())));
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
22 changes: 16 additions & 6 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,33 @@ impl Buffer {

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
pub fn slice(&self, offset: usize) -> Self {
let mut s = self.clone();
s.advance(offset);
s
}

/// Increases the offset of this buffer by `offset`
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
#[inline]
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
);
self.length -= offset;
// Safety:
// This cannot overflow as
// `self.offset + self.length < self.data.len()`
// `offset < self.length`
let ptr = unsafe { self.ptr.add(offset) };
Self {
data: self.data.clone(),
length: self.length - offset,
ptr,
}
self.ptr = unsafe { self.ptr.add(offset) };
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
Expand Down
9 changes: 7 additions & 2 deletions arrow-buffer/src/buffer/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::buffer::ScalarBuffer;
use crate::{ArrowNativeType, MutableBuffer};
use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder};
use std::ops::Deref;

/// A non-empty buffer of monotonically increasing, positive integers.
Expand Down Expand Up @@ -55,7 +55,6 @@ use std::ops::Deref;
/// (offsets[i],
/// offsets[i+1])
/// ```
#[derive(Debug, Clone)]
pub struct OffsetBuffer<O: ArrowNativeType>(ScalarBuffer<O>);

Expand Down Expand Up @@ -174,6 +173,12 @@ impl<T: ArrowNativeType> AsRef<[T]> for OffsetBuffer<T> {
}
}

impl<O: ArrowNativeType> From<OffsetBufferBuilder<O>> for OffsetBuffer<O> {
fn from(value: OffsetBufferBuilder<O>) -> Self {
value.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
52 changes: 52 additions & 0 deletions arrow-buffer/src/buffer/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ impl<T: ArrowNativeType> From<Vec<T>> for ScalarBuffer<T> {
}
}

impl<T: ArrowNativeType> From<ScalarBuffer<T>> for Vec<T> {
fn from(value: ScalarBuffer<T>) -> Self {
value
.buffer
.into_vec()
.unwrap_or_else(|buffer| buffer.typed_data::<T>().into())
}
}

impl<T: ArrowNativeType> From<BufferBuilder<T>> for ScalarBuffer<T> {
fn from(mut value: BufferBuilder<T>) -> Self {
let len = value.len();
Expand Down Expand Up @@ -208,6 +217,8 @@ impl<T: ArrowNativeType> PartialEq<ScalarBuffer<T>> for Vec<T> {

#[cfg(test)]
mod tests {
use std::{ptr::NonNull, sync::Arc};

use super::*;

#[test]
Expand Down Expand Up @@ -284,4 +295,45 @@ mod tests {
let scalar_buffer = ScalarBuffer::from(buffer_builder);
assert_eq!(scalar_buffer.as_ref(), input);
}

#[test]
fn into_vec() {
let input = vec![1u8, 2, 3, 4];

// No copy
let input_buffer = Buffer::from_vec(input.clone());
let input_ptr = input_buffer.as_ptr();
let input_len = input_buffer.len();
let scalar_buffer = ScalarBuffer::<u8>::new(input_buffer, 0, input_len);
let vec = Vec::from(scalar_buffer);
assert_eq!(vec.as_slice(), input.as_slice());
assert_eq!(vec.as_ptr(), input_ptr);

// Custom allocation - makes a copy
let mut input_clone = input.clone();
let input_ptr = NonNull::new(input_clone.as_mut_ptr()).unwrap();
let dealloc = Arc::new(());
let buffer =
unsafe { Buffer::from_custom_allocation(input_ptr, input_clone.len(), dealloc as _) };
let scalar_buffer = ScalarBuffer::<u8>::new(buffer, 0, input.len());
let vec = Vec::from(scalar_buffer);
assert_eq!(vec, input.as_slice());
assert_ne!(vec.as_ptr(), input_ptr.as_ptr());

// Offset - makes a copy
let input_buffer = Buffer::from_vec(input.clone());
let input_ptr = input_buffer.as_ptr();
let input_len = input_buffer.len();
let scalar_buffer = ScalarBuffer::<u8>::new(input_buffer, 1, input_len - 1);
let vec = Vec::from(scalar_buffer);
assert_eq!(vec.as_slice(), &input[1..]);
assert_ne!(vec.as_ptr(), input_ptr);

// Inner buffer Arc ref count != 0 - makes a copy
let buffer = Buffer::from_slice_ref(input.as_slice());
let scalar_buffer = ScalarBuffer::<u8>::new(buffer, 0, input.len());
let vec = Vec::from(scalar_buffer);
assert_eq!(vec, input.as_slice());
assert_ne!(vec.as_ptr(), input.as_ptr());
}
}
5 changes: 4 additions & 1 deletion arrow-buffer/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
//! Buffer builders
mod boolean;
pub use boolean::*;
mod null;
mod offset;

pub use boolean::*;
pub use null::*;
pub use offset::*;

use crate::{ArrowNativeType, Buffer, MutableBuffer};
use std::{iter, marker::PhantomData};
Expand Down
125 changes: 125 additions & 0 deletions arrow-buffer/src/builder/offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::ops::Deref;

use crate::{ArrowNativeType, OffsetBuffer};

#[derive(Debug)]
pub struct OffsetBufferBuilder<O: ArrowNativeType> {
offsets: Vec<O>,
last_offset: usize,
}

/// Builder of [`OffsetBuffer`]
impl<O: ArrowNativeType> OffsetBufferBuilder<O> {
/// Create a new builder with space for `capacity + 1` offsets
pub fn new(capacity: usize) -> Self {
let mut offsets = Vec::with_capacity(capacity + 1);
offsets.push(O::usize_as(0));
Self {
offsets,
last_offset: 0,
}
}

/// Push a slice of `length` bytes
///
/// # Panics
///
/// Panics if adding `length` would overflow `usize`
#[inline]
pub fn push_length(&mut self, length: usize) {
self.last_offset = self.last_offset.checked_add(length).expect("overflow");
self.offsets.push(O::usize_as(self.last_offset))
}

/// Reserve space for at least `additional` further offsets
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.offsets.reserve(additional);
}

/// Takes the builder itself and returns an [`OffsetBuffer`]
///
/// # Panics
///
/// Panics if offsets overflow `O`
pub fn finish(self) -> OffsetBuffer<O> {
O::from_usize(self.last_offset).expect("overflow");
unsafe { OffsetBuffer::new_unchecked(self.offsets.into()) }
}

/// Builds the [OffsetBuffer] without resetting the builder.
///
/// # Panics
///
/// Panics if offsets overflow `O`
pub fn finish_cloned(&self) -> OffsetBuffer<O> {
O::from_usize(self.last_offset).expect("overflow");
unsafe { OffsetBuffer::new_unchecked(self.offsets.clone().into()) }
}
}

impl<O: ArrowNativeType> Deref for OffsetBufferBuilder<O> {
type Target = [O];

fn deref(&self) -> &Self::Target {
self.offsets.as_ref()
}
}

#[cfg(test)]
mod tests {
use crate::OffsetBufferBuilder;

#[test]
fn test_basic() {
let mut builder = OffsetBufferBuilder::<i32>::new(5);
assert_eq!(builder.len(), 1);
assert_eq!(&*builder, &[0]);
let finished = builder.finish_cloned();
assert_eq!(finished.len(), 1);
assert_eq!(&*finished, &[0]);

builder.push_length(2);
builder.push_length(6);
builder.push_length(0);
builder.push_length(13);

let finished = builder.finish();
assert_eq!(&*finished, &[0, 2, 8, 8, 21]);
}

#[test]
#[should_panic(expected = "overflow")]
fn test_usize_overflow() {
let mut builder = OffsetBufferBuilder::<i32>::new(5);
builder.push_length(1);
builder.push_length(usize::MAX);
builder.finish();
}

#[test]
#[should_panic(expected = "overflow")]
fn test_i32_overflow() {
let mut builder = OffsetBufferBuilder::<i32>::new(5);
builder.push_length(1);
builder.push_length(i32::MAX as usize);
builder.finish();
}
}
Loading

0 comments on commit 448bc9e

Please sign in to comment.