Skip to content

Commit

Permalink
using view_buffer instead of offset_buffer for view type parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Apr 10, 2024
1 parent 91f0b17 commit a76d47e
Show file tree
Hide file tree
Showing 8 changed files with 815 additions and 228 deletions.
5 changes: 5 additions & 0 deletions arrow-data/src/byte_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl ByteView {
| ((self.buffer_index as u128) << 64)
| ((self.offset as u128) << 96)
}

#[inline(always)]
pub fn as_bytes(self) -> [u8; 16] {
self.as_u128().to_le_bytes()
}
}

impl From<u128> for ByteView {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand Down
144 changes: 1 addition & 143 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,6 @@ pub fn make_byte_array_reader(
}
}

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Some(t) => t,
None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
_ => ArrowType::BinaryView,
},
};

match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}

_ => Err(general_err!(
"invalid data type for byte array reader read to view type - {}",
data_type
)),
}
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
Expand Down Expand Up @@ -618,7 +588,7 @@ mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray, StringViewArray};
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

#[test]
Expand Down Expand Up @@ -676,64 +646,6 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);

assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(
output.values.as_slice(),
"helloworldlarge payload over 12 bytesb".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("world"),
None,
Some("large payload over 12 bytes"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
Expand Down Expand Up @@ -778,60 +690,6 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder_skip() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(
output.values.as_slice(),
"hellolarge payload over 12 bytes".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 32]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("large payload over 12 bytes"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand Down
Loading

0 comments on commit a76d47e

Please sign in to comment.