Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading and writingStringView and BinaryView in parquet (part 1) #5618

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
}
}

impl<'a, Ptr, T> FromIterator<&'a Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native> + 'a,
T: ByteViewType + ?Sized,
{
fn from_iter<I: IntoIterator<Item = &'a Option<Ptr>>>(iter: I) -> Self {
iter.into_iter()
.map(|o| o.as_ref().map(|p| p.as_ref()))
.collect()
}
}

impl<Ptr, T: ByteViewType + ?Sized> FromIterator<Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native>,
Expand All @@ -400,7 +412,23 @@ where
/// ```
pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;

/// A [`GenericByteViewArray`] that stores uf8 data
impl BinaryViewArray {
/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// If items not utf8 data, validate will fail and error returned.
pub fn to_string_view(self) -> Result<StringViewArray, ArrowError> {
StringViewType::validate(self.views(), self.data_buffers())?;
unsafe { Ok(self.to_string_view_unchecked()) }
}

/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// # Safety
/// Caller is responsible for ensuring that items in array are utf8 data.
pub unsafe fn to_string_view_unchecked(self) -> StringViewArray {
StringViewArray::new_unchecked(self.views, self.buffers, self.nulls)
}
}

/// A [`GenericByteViewArray`] that stores utf8 data
///
/// # Example
/// ```
Expand All @@ -411,12 +439,37 @@ pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
/// ```
pub type StringViewArray = GenericByteViewArray<StringViewType>;

impl StringViewArray {
/// Convert the [`StringViewArray`] to [`BinaryViewArray`]
pub fn to_binary_view(self) -> BinaryViewArray {
unsafe { BinaryViewArray::new_unchecked(self.views, self.buffers, self.nulls) }
}
}

impl From<Vec<&str>> for StringViewArray {
fn from(v: Vec<&str>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<&str>>> for StringViewArray {
fn from(v: Vec<Option<&str>>) -> Self {
v.into_iter().collect()
}
}

impl From<Vec<String>> for StringViewArray {
fn from(v: Vec<String>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<String>>> for StringViewArray {
fn from(v: Vec<Option<String>>) -> Self {
v.into_iter().collect()
}
}

#[cfg(test)]
mod tests {
use crate::builder::StringViewBuilder;
Expand Down
36 changes: 36 additions & 0 deletions arrow/src/util/bench_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ pub fn create_string_array_with_len<Offset: OffsetSizeTrait>(
.collect()
}

/// Creates a random (but fixed-seeded) array of a given size, null density and length
pub fn create_string_view_array_with_len(
size: usize,
null_density: f32,
str_len: usize,
mixed: bool,
) -> StringViewArray {
let rng = &mut seedable_rng();

let mut lengths = Vec::with_capacity(size);

// if mixed, we creates first half that string length small than 12 bytes and second half large than 12 bytes
if mixed {
for _ in 0..size / 2 {
lengths.push(rng.gen_range(1..12));
}
for _ in size / 2..size {
lengths.push(rng.gen_range(12..=std::cmp::max(30, str_len)));
}
} else {
lengths.resize(size, str_len);
}

lengths
.into_iter()
.map(|len| {
if rng.gen::<f32>() < null_density {
None
} else {
let value: Vec<u8> = rng.sample_iter(&Alphanumeric).take(len).collect();
Some(String::from_utf8(value).unwrap())
}
})
.collect()
}

/// Creates an random (but fixed-seeded) array of a given size and null density
/// consisting of random 4 character alphanumeric strings
pub fn create_string_dict_array<K: ArrowDictionaryKeyType>(
Expand Down
9 changes: 9 additions & 0 deletions arrow/src/util/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,18 @@ pub fn create_random_array(
},
Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
Utf8View => Arc::new(create_string_view_array_with_len(
size,
null_density,
4,
false,
)),
Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
BinaryView => Arc::new(
create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
),
List(_) => create_random_list_array(field, size, null_density, true_density)?,
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
Struct(fields) => Arc::new(StructArray::try_from(
Expand Down
99 changes: 98 additions & 1 deletion parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader, ListArrayReader,
make_byte_array_reader, make_byte_view_array_reader, make_fixed_len_byte_array_reader,
ListArrayReader,
};
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
Expand Down Expand Up @@ -502,6 +503,13 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_view_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -993,6 +1001,95 @@ fn add_benches(c: &mut Criterion) {

group.finish();

// string view benchmarks
//==============================

let mut group = c.benchmark_group("arrow_array_reader/StringViewArray");

// string, plain encoded, no NULLs
let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, plain encoded, half NULLs
let plain_string_half_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.finish();

// list benchmarks
//==============================

Expand Down
34 changes: 34 additions & 0 deletions parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ fn create_string_bench_batch(
)?)
}

fn create_string_and_binary_view_bench_batch(
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let fields = vec![
Field::new("_1", DataType::Utf8View, true),
Field::new("_2", DataType::BinaryView, true),
];
let schema = Schema::new(fields);
Ok(create_random_batch(
Arc::new(schema),
size,
null_density,
true_density,
)?)
}

fn create_string_dictionary_bench_batch(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -395,6 +413,22 @@ fn bench_primitive_writer(c: &mut Criterion) {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_and_binary_view_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
.columns()
.iter()
.map(|f| f.get_array_memory_size() as u64)
.sum(),
));
group.bench_function("4096 values string", |b| {
b.iter(|| write_batch(&batch).unwrap())
});

group.bench_function("4096 values string with bloom filter", |b| {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
Expand Down
28 changes: 12 additions & 16 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand All @@ -29,9 +30,7 @@ use crate::arrow::array_reader::{
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type,
};
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};

Expand All @@ -55,17 +54,13 @@ fn build_reader(
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => {
build_primitive_reader(field, mask, row_groups)
}
ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
DataType::FixedSizeList(_, _) => {
build_fixed_size_list_reader(field, mask, row_groups)
}
DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
d => unimplemented!("reading group type {} not implemented", d),
},
}
Expand Down Expand Up @@ -140,9 +135,9 @@ fn build_list_reader(
DataType::List(f) => {
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
}
DataType::LargeList(f) => DataType::LargeList(Arc::new(
f.as_ref().clone().with_data_type(item_type),
)),
DataType::LargeList(f) => {
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
}
_ => unreachable!(),
};

Expand Down Expand Up @@ -289,6 +284,9 @@ fn build_primitive_reader(
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
}
Some(DataType::Utf8View | DataType::BinaryView) => {
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -347,8 +345,7 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
Expand All @@ -359,8 +356,7 @@ mod tests {
)
.unwrap();

let array_reader =
build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
Expand Down
Loading
Loading