From f32aabc2c567a7331d874aa5c5b87ddd8bb229bc Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 26 Jun 2024 00:35:14 -0400 Subject: [PATCH 1/8] implement sort for view types --- arrow-ord/src/sort.rs | 70 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs index 8ae87787d283..885f236447d9 100644 --- a/arrow-ord/src/sort.rs +++ b/arrow-ord/src/sort.rs @@ -206,8 +206,10 @@ pub fn sort_to_indices( DataType::Boolean => sort_boolean(array.as_boolean(), v, n, options, limit), DataType::Utf8 => sort_bytes(array.as_string::(), v, n, options, limit), DataType::LargeUtf8 => sort_bytes(array.as_string::(), v, n, options, limit), + DataType::Utf8View => sort_byte_view(array.as_string_view(), v, n, options, limit), DataType::Binary => sort_bytes(array.as_binary::(), v, n, options, limit), DataType::LargeBinary => sort_bytes(array.as_binary::(), v, n, options, limit), + DataType::BinaryView => sort_byte_view(array.as_binary_view(), v, n, options, limit), DataType::FixedSizeBinary(_) => sort_fixed_size_binary(array.as_fixed_size_binary(), v, n, options, limit), DataType::List(_) => sort_list(array.as_list::(), v, n, options, limit)?, DataType::LargeList(_) => sort_list(array.as_list::(), v, n, options, limit)?, @@ -277,6 +279,20 @@ fn sort_bytes( sort_impl(options, &mut valids, &nulls, limit, Ord::cmp).into() } +fn sort_byte_view( + values: &GenericByteViewArray, + value_indices: Vec, + nulls: Vec, + options: SortOptions, + limit: Option, +) -> UInt32Array { + let mut valids = value_indices + .into_iter() + .map(|index| (index, values.value(index as usize).as_ref())) + .collect::>(); + sort_impl(options, &mut valids, &nulls, limit, Ord::cmp).into() +} + fn sort_fixed_size_binary( values: &FixedSizeBinaryArray, value_indices: Vec, @@ -890,13 +906,21 @@ mod tests { }; assert_eq!(&output, &expected); - let output = LargeStringArray::from(data); - let expected = Arc::new(LargeStringArray::from(expected_data)) as ArrayRef; + let output = LargeStringArray::from(data.clone()); + let expected = Arc::new(LargeStringArray::from(expected_data.clone())) as ArrayRef; let output = match limit { Some(_) => sort_limit(&(Arc::new(output) as ArrayRef), options, limit).unwrap(), _ => sort(&(Arc::new(output) as ArrayRef), options).unwrap(), }; - assert_eq!(&output, &expected) + assert_eq!(&output, &expected); + + let output = StringViewArray::from(data); + let expected = Arc::new(StringViewArray::from(expected_data)) as ArrayRef; + let output = match limit { + Some(_) => sort_limit(&(Arc::new(output) as ArrayRef), options, limit).unwrap(), + _ => sort(&(Arc::new(output) as ArrayRef), options).unwrap(), + }; + assert_eq!(&output, &expected); } fn test_sort_string_dict_arrays( @@ -2423,8 +2447,10 @@ mod tests { None, Some("bad"), Some("sad"), + Some("long string longer than 12 bytes"), None, Some("glad"), + Some("lang string longer than 12 bytes"), Some("-ad"), ], None, @@ -2435,6 +2461,8 @@ mod tests { Some("-ad"), Some("bad"), Some("glad"), + Some("lang string longer than 12 bytes"), + Some("long string longer than 12 bytes"), Some("sad"), ], ); @@ -2444,8 +2472,10 @@ mod tests { None, Some("bad"), Some("sad"), + Some("long string longer than 12 bytes"), None, Some("glad"), + Some("lang string longer than 12 bytes"), Some("-ad"), ], Some(SortOptions { @@ -2455,6 +2485,8 @@ mod tests { None, vec![ Some("sad"), + Some("long string longer than 12 bytes"), + Some("lang string longer than 12 bytes"), Some("glad"), Some("bad"), Some("-ad"), @@ -2467,9 +2499,11 @@ mod tests { vec![ None, Some("bad"), + Some("long string longer than 12 bytes"), Some("sad"), None, Some("glad"), + Some("lang string longer than 12 bytes"), Some("-ad"), ], Some(SortOptions { @@ -2483,6 +2517,8 @@ mod tests { Some("-ad"), Some("bad"), Some("glad"), + Some("lang string longer than 12 bytes"), + Some("long string longer than 12 bytes"), Some("sad"), ], ); @@ -2491,9 +2527,11 @@ mod tests { vec![ None, Some("bad"), + Some("long string longer than 12 bytes"), Some("sad"), None, Some("glad"), + Some("lang string longer than 12 bytes"), Some("-ad"), ], Some(SortOptions { @@ -2505,6 +2543,8 @@ mod tests { None, None, Some("sad"), + Some("long string longer than 12 bytes"), + Some("lang string longer than 12 bytes"), Some("glad"), Some("bad"), Some("-ad"), @@ -2515,9 +2555,11 @@ mod tests { vec![ None, Some("bad"), + Some("long string longer than 12 bytes"), Some("sad"), None, Some("glad"), + Some("lang string longer than 12 bytes"), Some("-ad"), ], Some(SortOptions { @@ -2530,17 +2572,27 @@ mod tests { // valid values less than limit with extra nulls test_sort_string_arrays( - vec![Some("def"), None, None, Some("abc")], + vec![ + Some("def long string longer than 12"), + None, + None, + Some("abc"), + ], Some(SortOptions { descending: false, nulls_first: false, }), Some(3), - vec![Some("abc"), Some("def"), None], + vec![Some("abc"), Some("def long string longer than 12"), None], ); test_sort_string_arrays( - vec![Some("def"), None, None, Some("abc")], + vec![ + Some("def long string longer than 12"), + None, + None, + Some("abc"), + ], Some(SortOptions { descending: false, nulls_first: true, @@ -2551,7 +2603,7 @@ mod tests { // more nulls than limit test_sort_string_arrays( - vec![Some("def"), None, None, None], + vec![Some("def long string longer than 12"), None, None, None], Some(SortOptions { descending: false, nulls_first: true, @@ -2561,13 +2613,13 @@ mod tests { ); test_sort_string_arrays( - vec![Some("def"), None, None, None], + vec![Some("def long string longer than 12"), None, None, None], Some(SortOptions { descending: false, nulls_first: false, }), Some(2), - vec![Some("def"), None], + vec![Some("def long string longer than 12"), None], ); } From 8f1c887cbef45b45f320fff4be28ffdf0b56ef94 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 27 Jun 2024 12:29:33 -0400 Subject: [PATCH 2/8] add bench for binary/binary view --- parquet/benches/arrow_reader.rs | 229 +++++++++++++++++++++++++++++--- 1 file changed, 209 insertions(+), 20 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index e1853d755271..927998ac2489 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -63,6 +63,8 @@ fn build_test_schema() -> SchemaDescPtr { optional BYTE_ARRAY element (UTF8); } } + REQUIRED BYTE_ARRAY mandatory_binary_leaf; + OPTIONAL BYTE_ARRAY optional_binary_leaf; } "; parse_message_type(message_type) @@ -71,8 +73,8 @@ fn build_test_schema() -> SchemaDescPtr { } // test data params -const NUM_ROW_GROUPS: usize = 1; -const PAGES_PER_GROUP: usize = 2; +const NUM_ROW_GROUPS: usize = 2; +const PAGES_PER_GROUP: usize = 4; const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 8192; const MAX_LIST_LEN: usize = 10; @@ -261,7 +263,7 @@ where InMemoryPageIterator::new(pages) } -fn build_plain_encoded_string_page_iterator( +fn build_plain_encoded_byte_array_page_iterator( column_desc: ColumnDescPtr, null_density: f32, ) -> impl PageIterator + Clone { @@ -496,13 +498,20 @@ fn create_decimal_by_bytes_reader( } } -fn create_string_byte_array_reader( +fn create_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() } +fn create_byte_view_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap() +} + fn create_string_view_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -525,7 +534,7 @@ fn create_string_list_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - let items = create_string_byte_array_reader(page_iterator, column_desc); + let items = create_byte_array_reader(page_iterator, column_desc); let field = Field::new("item", DataType::Utf8, true); let data_type = DataType::List(Arc::new(field)); Box::new(ListArrayReader::::new(items, data_type, 2, 1, true)) @@ -845,6 +854,8 @@ fn add_benches(c: &mut Criterion) { let mandatory_int64_column_desc = schema.column(4); let optional_int64_column_desc = schema.column(5); let string_list_desc = schema.column(14); + let mandatory_binary_column_desc = schema.column(15); + let optional_binary_column_desc = schema.column(16); // primitive / int32 benchmarks // ============================= @@ -879,10 +890,10 @@ fn add_benches(c: &mut Criterion) { // string, plain encoded, no NULLs let plain_string_no_null_data = - build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0); + build_plain_encoded_byte_array_page_iterator(mandatory_string_column_desc.clone(), 0.0); group.bench_function("plain encoded, mandatory, no NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( plain_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); @@ -892,10 +903,10 @@ fn add_benches(c: &mut Criterion) { }); let plain_string_no_null_data = - build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0); + build_plain_encoded_byte_array_page_iterator(optional_string_column_desc.clone(), 0.0); group.bench_function("plain encoded, optional, no NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( plain_string_no_null_data.clone(), optional_string_column_desc.clone(), ); @@ -906,10 +917,10 @@ fn add_benches(c: &mut Criterion) { // string, plain encoded, half NULLs let plain_string_half_null_data = - build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5); + build_plain_encoded_byte_array_page_iterator(optional_string_column_desc.clone(), 0.5); group.bench_function("plain encoded, optional, half NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( plain_string_half_null_data.clone(), optional_string_column_desc.clone(), ); @@ -923,7 +934,7 @@ fn add_benches(c: &mut Criterion) { build_dictionary_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0); group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( dictionary_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); @@ -936,7 +947,7 @@ fn add_benches(c: &mut Criterion) { build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0); group.bench_function("dictionary encoded, optional, no NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( dictionary_string_no_null_data.clone(), optional_string_column_desc.clone(), ); @@ -950,7 +961,7 @@ fn add_benches(c: &mut Criterion) { build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5); group.bench_function("dictionary encoded, optional, half NULLs", |b| { b.iter(|| { - let array_reader = create_string_byte_array_reader( + let array_reader = create_byte_array_reader( dictionary_string_half_null_data.clone(), optional_string_column_desc.clone(), ); @@ -961,6 +972,184 @@ fn add_benches(c: &mut Criterion) { group.finish(); + // binary benchmarks + //============================== + + let mut group = c.benchmark_group("arrow_array_reader/BinaryArray"); + + // byte array, plain encoded, no NULLs + let plain_byte_array_no_null_data = + build_plain_encoded_byte_array_page_iterator(mandatory_binary_column_desc.clone(), 0.0); + group.bench_function("plain encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + plain_byte_array_no_null_data.clone(), + mandatory_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let plain_byte_array_no_null_data = + build_plain_encoded_byte_array_page_iterator(optional_binary_column_desc.clone(), 0.0); + group.bench_function("plain encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + plain_byte_array_no_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // byte array, plain encoded, half NULLs + let plain_byte_array_half_null_data = + build_plain_encoded_byte_array_page_iterator(optional_binary_column_desc.clone(), 0.5); + group.bench_function("plain encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + plain_byte_array_half_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // byte array, dictionary encoded, no NULLs + let dictionary_byte_array_no_null_data = + build_dictionary_encoded_string_page_iterator(mandatory_binary_column_desc.clone(), 0.0); + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + dictionary_byte_array_no_null_data.clone(), + mandatory_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let dictionary_byte_array_no_null_data = + build_dictionary_encoded_string_page_iterator(optional_binary_column_desc.clone(), 0.0); + group.bench_function("dictionary encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + dictionary_byte_array_no_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // string, dictionary encoded, half NULLs + let dictionary_byte_array_half_null_data = + build_dictionary_encoded_string_page_iterator(optional_binary_column_desc.clone(), 0.5); + group.bench_function("dictionary encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + dictionary_byte_array_half_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + group.finish(); + + // binary view benchmarks + //============================== + + let mut group = c.benchmark_group("arrow_array_reader/BinaryViewArray"); + + // binary view, plain encoded, no NULLs + let plain_byte_array_no_null_data = + build_plain_encoded_byte_array_page_iterator(mandatory_binary_column_desc.clone(), 0.0); + group.bench_function("plain encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + plain_byte_array_no_null_data.clone(), + mandatory_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let plain_byte_array_no_null_data = + build_plain_encoded_byte_array_page_iterator(optional_binary_column_desc.clone(), 0.0); + group.bench_function("plain encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + plain_byte_array_no_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary view, plain encoded, half NULLs + let plain_byte_array_half_null_data = + build_plain_encoded_byte_array_page_iterator(optional_binary_column_desc.clone(), 0.5); + group.bench_function("plain encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + plain_byte_array_half_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary view, dictionary encoded, no NULLs + let dictionary_byte_array_no_null_data = + build_dictionary_encoded_string_page_iterator(mandatory_binary_column_desc.clone(), 0.0); + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + dictionary_byte_array_no_null_data.clone(), + mandatory_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let dictionary_byte_array_no_null_data = + build_dictionary_encoded_string_page_iterator(optional_binary_column_desc.clone(), 0.0); + group.bench_function("dictionary encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + dictionary_byte_array_no_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary view, dictionary encoded, half NULLs + let dictionary_byte_array_half_null_data = + build_dictionary_encoded_string_page_iterator(optional_binary_column_desc.clone(), 0.5); + group.bench_function("dictionary encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_view_array_reader( + dictionary_byte_array_half_null_data.clone(), + optional_binary_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + group.finish(); + // string dictionary benchmarks //============================== @@ -969,7 +1158,7 @@ fn add_benches(c: &mut Criterion) { group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), + dictionary_byte_array_no_null_data.clone(), mandatory_string_column_desc.clone(), ); count = bench_array_reader(array_reader); @@ -980,7 +1169,7 @@ fn add_benches(c: &mut Criterion) { group.bench_function("dictionary encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), + dictionary_byte_array_no_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); @@ -991,7 +1180,7 @@ fn add_benches(c: &mut Criterion) { group.bench_function("dictionary encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_half_null_data.clone(), + dictionary_byte_array_half_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); @@ -1008,7 +1197,7 @@ fn add_benches(c: &mut Criterion) { // string, plain encoded, no NULLs let plain_string_no_null_data = - build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0); + build_plain_encoded_byte_array_page_iterator(mandatory_string_column_desc.clone(), 0.0); group.bench_function("plain encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_view_byte_array_reader( @@ -1021,7 +1210,7 @@ fn add_benches(c: &mut Criterion) { }); let plain_string_no_null_data = - build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0); + build_plain_encoded_byte_array_page_iterator(optional_string_column_desc.clone(), 0.0); group.bench_function("plain encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_view_byte_array_reader( @@ -1035,7 +1224,7 @@ fn add_benches(c: &mut Criterion) { // string, plain encoded, half NULLs let plain_string_half_null_data = - build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5); + build_plain_encoded_byte_array_page_iterator(optional_string_column_desc.clone(), 0.5); group.bench_function("plain encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_view_byte_array_reader( From 45d77529a63c9c6b4eeca7bd2eb1ad2ab1ba05c4 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 27 Jun 2024 15:04:01 -0400 Subject: [PATCH 3/8] add view buffer, prepare for byte_view_array reader --- parquet/src/arrow/buffer/mod.rs | 1 + parquet/src/arrow/buffer/view_buffer.rs | 190 ++++++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 parquet/src/arrow/buffer/view_buffer.rs diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index cbc795d94f57..e7566d9a505f 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,3 +20,4 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; +pub mod view_buffer; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs new file mode 100644 index 000000000000..8b499fa9f133 --- /dev/null +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::record_reader::buffer::ValuesBuffer; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::Buffer; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of view type byte arrays that can be converted into +/// `GenericByteViewArray` +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec, + pub buffers: Vec, +} + +impl ViewBuffer { + pub fn append_block(&mut self, block: Buffer) -> u32 { + let block_id = self.buffers.len() as u32; + self.buffers.push(block); + block_id + } + + /// # Safety + /// Similar to `GenericByteViewBuilder::append_view_unchecked`, this method is only safe when: + /// - `block` is a valid index, i.e., the return value of `append_block` + /// - `offset` and `offset + len` are valid indices into the buffer + /// - The `(offset, offset + len)` is valid value for the native type. + pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) { + let b = self.buffers.get_unchecked(block as usize); + let start = offset as usize; + let end = start.saturating_add(len as usize); + let b = b.get_unchecked(start..end); + + if len <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); + view_buffer[4..4 + b.len()].copy_from_slice(b); + self.views.push(u128::from_le_bytes(view_buffer)); + } else { + let view = ByteView { + length: len, + prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), + buffer_index: block, + offset, + }; + self.views.push(view.into()); + } + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + #[allow(unused)] + pub fn into_array(self, null_buffer: Option, data_type: &ArrowType) -> ArrayRef { + let len = self.views.len(); + let views = Buffer::from_vec(self.views); + match data_type { + ArrowType::Utf8View => { + let builder = ArrayDataBuilder::new(ArrowType::Utf8View) + .len(len) + .add_buffer(views) + .add_buffers(self.buffers) + .null_bit_buffer(null_buffer); + // We have checked that the data is utf8 when building the buffer, so it is safe + let array = unsafe { builder.build_unchecked() }; + make_array(array) + } + ArrowType::BinaryView => { + let builder = ArrayDataBuilder::new(ArrowType::BinaryView) + .len(len) + .add_buffer(views) + .add_buffers(self.buffers) + .null_bit_buffer(null_buffer); + let array = unsafe { builder.build_unchecked() }; + make_array(array) + } + _ => panic!("Unsupported data type: {:?}", data_type), + } + } +} + +impl ValuesBuffer for ViewBuffer { + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + self.views + .pad_nulls(read_offset, values_read, levels_read, valid_mask); + } +} + +#[cfg(test)] +mod tests { + + use arrow_array::Array; + + use super::*; + + #[test] + fn test_view_buffer_empty() { + let buffer = ViewBuffer::default(); + let array = buffer.into_array(None, &ArrowType::Utf8View); + let strings = array + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(strings.len(), 0); + } + + #[test] + fn test_view_buffer_append_view() { + let mut buffer = ViewBuffer::default(); + let string_buffer = Buffer::from(&b"0123456789long string to test string view"[..]); + let block_id = buffer.append_block(string_buffer); + + unsafe { + buffer.append_view_unchecked(block_id, 0, 1); + buffer.append_view_unchecked(block_id, 1, 9); + buffer.append_view_unchecked(block_id, 10, 31); + } + + let array = buffer.into_array(None, &ArrowType::Utf8View); + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + string_array.iter().collect::>(), + vec![ + Some("0"), + Some("123456789"), + Some("long string to test string view"), + ] + ); + } + + #[test] + fn test_view_buffer_pad_null() { + let mut buffer = ViewBuffer::default(); + let string_buffer = Buffer::from(&b"0123456789long string to test string view"[..]); + let block_id = buffer.append_block(string_buffer); + + unsafe { + buffer.append_view_unchecked(block_id, 0, 1); + buffer.append_view_unchecked(block_id, 1, 9); + buffer.append_view_unchecked(block_id, 10, 31); + } + + let valid = [true, false, false, true, false, false, true]; + let valid_mask = Buffer::from_iter(valid.iter().copied()); + + buffer.pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice()); + + let array = buffer.into_array(Some(valid_mask), &ArrowType::Utf8View); + let strings = array + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + Some("0"), + None, + None, + Some("123456789"), + None, + None, + Some("long string to test string view"), + ] + ); + } +} From 3e243ad84a107a94056e85e3fb7686d0dd203a99 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 27 Jun 2024 15:19:33 -0400 Subject: [PATCH 4/8] make clippy happy --- parquet/src/arrow/buffer/view_buffer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 8b499fa9f133..d7e74c19f754 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -30,6 +30,7 @@ pub struct ViewBuffer { } impl ViewBuffer { + #[allow(unused)] pub fn append_block(&mut self, block: Buffer) -> u32 { let block_id = self.buffers.len() as u32; self.buffers.push(block); @@ -41,6 +42,7 @@ impl ViewBuffer { /// - `block` is a valid index, i.e., the return value of `append_block` /// - `offset` and `offset + len` are valid indices into the buffer /// - The `(offset, offset + len)` is valid value for the native type. + #[allow(unused)] pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) { let b = self.buffers.get_unchecked(block as usize); let start = offset as usize; From 25ad3c209bcad2dbe9ee3109ceeb6ecf04840266 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 28 Jun 2024 11:14:12 -0400 Subject: [PATCH 5/8] reuse make_view_unchecked --- .../src/builder/generic_bytes_view_builder.rs | 39 ++++++++++++------- parquet/src/arrow/buffer/view_buffer.rs | 30 +++++--------- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 6ec34bf5a91c..2a4bc53cb8bd 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -128,21 +128,8 @@ impl GenericByteViewBuilder { let end = start.saturating_add(len as usize); let b = b.get_unchecked(start..end); - if len <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); - view_buffer[4..4 + b.len()].copy_from_slice(b); - self.views_builder.append(u128::from_le_bytes(view_buffer)); - } else { - let view = ByteView { - length: len, - prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), - buffer_index: block, - offset, - }; - self.views_builder.append(view.into()); - } - + let view = make_view_unchecked(b, block, offset); + self.views_builder.append(view); self.null_buffer_builder.append_non_null(); } @@ -345,6 +332,28 @@ pub type StringViewBuilder = GenericByteViewBuilder; /// [`GenericByteViewBuilder::append_null`] as normal. pub type BinaryViewBuilder = GenericByteViewBuilder; +/// Create a view based on the given data, block id and offset +/// +/// # Safety +/// The caller must ensure that using `block_id` and `offset` will point to the correct `data` +pub unsafe fn make_view_unchecked(data: &[u8], block_id: u32, offset: u32) -> u128 { + let len = data.len() as u32; + if len <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); + view_buffer[4..4 + data.len()].copy_from_slice(data); + u128::from_le_bytes(view_buffer) + } else { + let view = ByteView { + length: len, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: block_id, + offset, + }; + view.into() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index d7e74c19f754..774c0dbae0c1 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -16,9 +16,9 @@ // under the License. use crate::arrow::record_reader::buffer::ValuesBuffer; -use arrow_array::{make_array, ArrayRef}; +use arrow_array::{builder::make_view_unchecked, make_array, ArrayRef}; use arrow_buffer::Buffer; -use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_data::ArrayDataBuilder; use arrow_schema::DataType as ArrowType; /// A buffer of view type byte arrays that can be converted into @@ -38,31 +38,19 @@ impl ViewBuffer { } /// # Safety - /// Similar to `GenericByteViewBuilder::append_view_unchecked`, this method is only safe when: + /// This method is only safe when: /// - `block` is a valid index, i.e., the return value of `append_block` /// - `offset` and `offset + len` are valid indices into the buffer /// - The `(offset, offset + len)` is valid value for the native type. #[allow(unused)] pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) { let b = self.buffers.get_unchecked(block as usize); - let start = offset as usize; - let end = start.saturating_add(len as usize); - let b = b.get_unchecked(start..end); - - if len <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&len.to_le_bytes()); - view_buffer[4..4 + b.len()].copy_from_slice(b); - self.views.push(u128::from_le_bytes(view_buffer)); - } else { - let view = ByteView { - length: len, - prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()), - buffer_index: block, - offset, - }; - self.views.push(view.into()); - } + let end = offset.saturating_add(len); + let b = b.get_unchecked(offset as usize..end as usize); + + let view = make_view_unchecked(b, block, offset); + + self.views.push(view); } /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` From 002b73d2b4ef1d12fd864db3fdbe51f9b490682c Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 28 Jun 2024 15:48:45 -0400 Subject: [PATCH 6/8] Update parquet/src/arrow/buffer/view_buffer.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/buffer/view_buffer.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 774c0dbae0c1..166e63fdcb85 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -23,6 +23,9 @@ use arrow_schema::DataType as ArrowType; /// A buffer of view type byte arrays that can be converted into /// `GenericByteViewArray` +/// +/// Note this does not reuse `GenericByteViewBuilder` due to the need to call `pad_nulls` +/// and reuse the existing logic for Vec in the parquet crate #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, From 7e8ff6a5315eba9e5ed817a5bccb824f33dcb042 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 28 Jun 2024 15:54:13 -0400 Subject: [PATCH 7/8] update --- arrow-array/src/builder/generic_bytes_view_builder.rs | 5 +---- parquet/src/arrow/buffer/view_buffer.rs | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 2a4bc53cb8bd..99231cc043c9 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -333,10 +333,7 @@ pub type StringViewBuilder = GenericByteViewBuilder; pub type BinaryViewBuilder = GenericByteViewBuilder; /// Create a view based on the given data, block id and offset -/// -/// # Safety -/// The caller must ensure that using `block_id` and `offset` will point to the correct `data` -pub unsafe fn make_view_unchecked(data: &[u8], block_id: u32, offset: u32) -> u128 { +pub fn make_view_unchecked(data: &[u8], block_id: u32, offset: u32) -> u128 { let len = data.len() as u32; if len <= 12 { let mut view_buffer = [0; 16]; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 166e63fdcb85..d769f15f5da1 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -24,7 +24,7 @@ use arrow_schema::DataType as ArrowType; /// A buffer of view type byte arrays that can be converted into /// `GenericByteViewArray` /// -/// Note this does not reuse `GenericByteViewBuilder` due to the need to call `pad_nulls` +/// Note this does not reuse `GenericByteViewBuilder` due to the need to call `pad_nulls` /// and reuse the existing logic for Vec in the parquet crate #[derive(Debug, Default)] pub struct ViewBuffer { From 5846ff081866fd05633cfc6127154f2606f0523c Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 28 Jun 2024 16:36:43 -0400 Subject: [PATCH 8/8] rename and inline --- arrow-array/src/builder/generic_bytes_view_builder.rs | 5 +++-- parquet/src/arrow/buffer/view_buffer.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 99231cc043c9..2bcc5a3f302f 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -128,7 +128,7 @@ impl GenericByteViewBuilder { let end = start.saturating_add(len as usize); let b = b.get_unchecked(start..end); - let view = make_view_unchecked(b, block, offset); + let view = make_view(b, block, offset); self.views_builder.append(view); self.null_buffer_builder.append_non_null(); } @@ -333,7 +333,8 @@ pub type StringViewBuilder = GenericByteViewBuilder; pub type BinaryViewBuilder = GenericByteViewBuilder; /// Create a view based on the given data, block id and offset -pub fn make_view_unchecked(data: &[u8], block_id: u32, offset: u32) -> u128 { +#[inline(always)] +pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 { let len = data.len() as u32; if len <= 12 { let mut view_buffer = [0; 16]; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index d769f15f5da1..01e7c4aad36b 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::record_reader::buffer::ValuesBuffer; -use arrow_array::{builder::make_view_unchecked, make_array, ArrayRef}; +use arrow_array::{builder::make_view, make_array, ArrayRef}; use arrow_buffer::Buffer; use arrow_data::ArrayDataBuilder; use arrow_schema::DataType as ArrowType; @@ -51,7 +51,7 @@ impl ViewBuffer { let end = offset.saturating_add(len); let b = b.get_unchecked(offset as usize..end as usize); - let view = make_view_unchecked(b, block, offset); + let view = make_view(b, block, offset); self.views.push(view); }