Skip to content

Commit

Permalink
add bench codes for view type read write
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Apr 4, 2024
1 parent 6b935c7 commit 897fbac
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 18 deletions.
43 changes: 40 additions & 3 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
}
}

impl<'a, Ptr, T> FromIterator<&'a Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native> + 'a,
T: ByteViewType + ?Sized,
{
fn from_iter<I: IntoIterator<Item = &'a Option<Ptr>>>(iter: I) -> Self {
iter.into_iter()
.map(|o| o.as_ref().map(|p| p.as_ref()))
.collect()
}
}

impl<Ptr, T: ByteViewType + ?Sized> FromIterator<Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native>,
Expand All @@ -403,15 +415,15 @@ pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
impl BinaryViewArray {
/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// If items not utf8 data, validate will fail and error returned.
pub fn to_stringview(self) -> Result<StringViewArray, ArrowError> {
pub fn to_string_view(self) -> Result<StringViewArray, ArrowError> {
StringViewType::validate(self.views(), self.data_buffers())?;
unsafe { Ok(self.to_stringview_unchecked()) }
unsafe { Ok(self.to_string_view_unchecked()) }
}

/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// # Safety
/// Caller is responsible for ensuring that items in array are utf8 data.
pub unsafe fn to_stringview_unchecked(self) -> StringViewArray {
pub unsafe fn to_string_view_unchecked(self) -> StringViewArray {
StringViewArray::new_unchecked(self.views, self.buffers, self.nulls)
}
}
Expand All @@ -427,12 +439,37 @@ impl BinaryViewArray {
/// ```
pub type StringViewArray = GenericByteViewArray<StringViewType>;

impl StringViewArray {
/// Convert the [`StringViewArray`] to [`BinaryViewArray`]
pub fn to_binary_view(self) -> BinaryViewArray {
unsafe { BinaryViewArray::new_unchecked(self.views, self.buffers, self.nulls) }
}
}

impl From<Vec<&str>> for StringViewArray {
fn from(v: Vec<&str>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<&str>>> for StringViewArray {
fn from(v: Vec<Option<&str>>) -> Self {
v.into_iter().collect()
}
}

impl From<Vec<String>> for StringViewArray {
fn from(v: Vec<String>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<String>>> for StringViewArray {
fn from(v: Vec<Option<String>>) -> Self {
v.into_iter().collect()
}
}

#[cfg(test)]
mod tests {
use crate::builder::StringViewBuilder;
Expand Down
36 changes: 36 additions & 0 deletions arrow/src/util/bench_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ pub fn create_string_array_with_len<Offset: OffsetSizeTrait>(
.collect()
}

/// Creates a random (but fixed-seeded) array of a given size, null density and length
pub fn create_string_view_array_with_len(
size: usize,
null_density: f32,
str_len: usize,
mixed: bool,
) -> StringViewArray {
let rng = &mut seedable_rng();

let mut lengths = Vec::with_capacity(size);

// if mixed, we creates first half that string length small than 12 bytes and second half large than 12 bytes
if mixed {
for _ in 0..size / 2 {
lengths.push(rng.gen_range(1..12));
}
for _ in size / 2..size {
lengths.push(rng.gen_range(12..=std::cmp::max(30, str_len)));
}
} else {
lengths.resize(size, str_len);
}

lengths
.into_iter()
.map(|len| {
if rng.gen::<f32>() < null_density {
None
} else {
let value: Vec<u8> = rng.sample_iter(&Alphanumeric).take(len).collect();
Some(String::from_utf8(value).unwrap())
}
})
.collect()
}

/// Creates an random (but fixed-seeded) array of a given size and null density
/// consisting of random 4 character alphanumeric strings
pub fn create_string_dict_array<K: ArrowDictionaryKeyType>(
Expand Down
9 changes: 9 additions & 0 deletions arrow/src/util/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,18 @@ pub fn create_random_array(
},
Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
Utf8View => Arc::new(create_string_view_array_with_len(
size,
null_density,
4,
false,
)),
Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
BinaryView => Arc::new(
create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
),
List(_) => create_random_list_array(field, size, null_density, true_density)?,
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
Struct(fields) => Arc::new(StructArray::try_from(
Expand Down
99 changes: 98 additions & 1 deletion parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader, ListArrayReader,
make_byte_array_reader, make_byte_view_array_reader, make_fixed_len_byte_array_reader,
ListArrayReader,
};
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
Expand Down Expand Up @@ -502,6 +503,13 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_view_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -993,6 +1001,95 @@ fn add_benches(c: &mut Criterion) {

group.finish();

// string view benchmarks
//==============================

let mut group = c.benchmark_group("arrow_array_reader/StringViewArray");

// string, plain encoded, no NULLs
let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, plain encoded, half NULLs
let plain_string_half_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.finish();

// list benchmarks
//==============================

Expand Down
34 changes: 34 additions & 0 deletions parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ fn create_string_bench_batch(
)?)
}

fn create_string_and_binary_view_bench_batch(
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let fields = vec![
Field::new("_1", DataType::Utf8View, true),
Field::new("_2", DataType::BinaryView, true),
];
let schema = Schema::new(fields);
Ok(create_random_batch(
Arc::new(schema),
size,
null_density,
true_density,
)?)
}

fn create_string_dictionary_bench_batch(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -395,6 +413,22 @@ fn bench_primitive_writer(c: &mut Criterion) {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_and_binary_view_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
.columns()
.iter()
.map(|f| f.get_array_memory_size() as u64)
.sum(),
));
group.bench_function("4096 values string", |b| {
b.iter(|| write_batch(&batch).unwrap())
});

group.bench_function("4096 values string with bloom filter", |b| {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod test_util;

pub use builder::build_array_reader;
pub use byte_array::make_byte_array_reader;
pub use byte_array::make_byte_view_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
#[allow(unused_imports)] // Only used for benchmarks
pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
Expand Down
15 changes: 12 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,8 @@ mod tests {
fn arrow_writer_binary_view() {
let string_field = Field::new("a", DataType::Utf8View, false);
let binary_field = Field::new("b", DataType::BinaryView, false);
let schema = Schema::new(vec![string_field, binary_field]);
let nullable_string_field = Field::new("a", DataType::Utf8View, true);
let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);

let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
let raw_binary_values = vec![
Expand All @@ -1240,16 +1241,24 @@ mod tests {
b"large payload over 12 bytes".to_vec(),
b"lulu".to_vec(),
];
let nullable_string_values =
vec![Some("foo"), None, Some("large payload over 12 bytes"), None];

let string_view_values = StringViewArray::from(raw_string_values);
let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
let nullable_string_view_values = StringViewArray::from(nullable_string_values);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(string_view_values), Arc::new(binary_view_values)],
vec![
Arc::new(string_view_values),
Arc::new(binary_view_values),
Arc::new(nullable_string_view_values),
],
)
.unwrap();

roundtrip(batch, Some(SMALL_SIZE / 2));
roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
roundtrip(batch, None);
}

fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
Expand Down
19 changes: 8 additions & 11 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
match data_type {
ArrowType::Utf8View => {
let mut builder = self.build_generic_byte_view();
Arc::new(builder.finish().to_stringview().unwrap())
Arc::new(builder.finish().to_string_view().unwrap())
}
ArrowType::BinaryView => {
let mut builder = self.build_generic_byte_view();
Expand All @@ -154,17 +154,14 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
}
}

fn build_generic_byte_view(&self) -> GenericByteViewBuilder<BinaryViewType> {
fn build_generic_byte_view(self) -> GenericByteViewBuilder<BinaryViewType> {
let mut builder = GenericByteViewBuilder::<BinaryViewType>::with_capacity(self.len());
for i in self.offsets.windows(2) {
let start = i[0];
let end = i[1];
let b = unsafe {
std::slice::from_raw_parts(
self.values.as_ptr().offset(start.to_isize().unwrap()),
(end - start).to_usize().unwrap(),
)
};
let mut values = self.values;
for window in self.offsets.windows(2) {
let start = window[0];
let end = window[1];
let len = (end - start).to_usize().unwrap();
let b = values.drain(..len).collect::<Vec<u8>>();
if b.is_empty() {
builder.append_null();
} else {
Expand Down

0 comments on commit 897fbac

Please sign in to comment.