Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ScalarBuffer in Parquet with Vec (#1849) (#5177) #5178

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 26 additions & 40 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -77,22 +76,19 @@ pub fn make_byte_array_reader(
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: ScalarValue> {
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
}

impl<I: ScalarValue> ByteArrayReader<I> {
impl<I: OffsetSizeTrait> ByteArrayReader<I> {
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
record_reader: GenericRecordReader<
OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>,
>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
) -> Self {
Self {
data_type,
Expand All @@ -104,7 +100,7 @@ impl<I: ScalarValue> ByteArrayReader<I> {
}
}

impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -167,15 +163,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

/// A [`ColumnValueDecoder`] for variable length byte arrays
struct ByteArrayColumnValueDecoder<I: ScalarValue> {
struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
dict: Option<OffsetBuffer<I>>,
decoder: Option<ByteArrayDecoder>,
validate_utf8: bool,
}

impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
for ByteArrayColumnValueDecoder<I>
{
impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
type Slice = OffsetBuffer<I>;

fn new(desc: &ColumnDescPtr) -> Self {
Expand Down Expand Up @@ -275,17 +269,15 @@ impl ByteArrayDecoder {
num_values,
validate_utf8,
)),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
data, num_levels, num_values,
))
}
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
ByteArrayDecoderDictionary::new(data, num_levels, num_values),
),
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
ByteArrayDecoderDelta::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => {
ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?)
}
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -298,7 +290,7 @@ impl ByteArrayDecoder {
}

/// Read up to `len` values to `out` with the optional dictionary
pub fn read<I: OffsetSizeTrait + ScalarValue>(
pub fn read<I: OffsetSizeTrait>(
&mut self,
out: &mut OffsetBuffer<I>,
len: usize,
Expand All @@ -307,8 +299,8 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.read(out, len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;
let dict =
dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.read(out, dict, len)
}
Expand All @@ -318,16 +310,16 @@ impl ByteArrayDecoder {
}

/// Skip `len` values
pub fn skip<I: OffsetSizeTrait + ScalarValue>(
pub fn skip<I: OffsetSizeTrait>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
) -> Result<usize> {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;
let dict =
dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.skip(dict, len)
}
Expand Down Expand Up @@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain {
}
}

pub fn read<I: OffsetSizeTrait + ScalarValue>(
pub fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes);

let start_offset = self.offset + 4;
Expand Down Expand Up @@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
Expand Down Expand Up @@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength {
})
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta {
})
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary {
}
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
dict: &OffsetBuffer<I>,
Expand All @@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary {
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(
keys,
dict.offsets.as_slice(),
dict.values.as_slice(),
)
output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
})
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
fn skip<I: OffsetSizeTrait>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
Expand Down
45 changes: 16 additions & 29 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use bytes::Bytes;

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader(
/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
///
/// Will attempt to preserve any dictionary encoding present in the parquet data
struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {

impl<K, V> ByteArrayDictionaryReader<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
record_reader: GenericRecordReader<
DictionaryBuffer<K, V>,
DictionaryDecoder<K, V>,
>,
record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
) -> Self {
Self {
data_type,
Expand All @@ -156,8 +151,8 @@ where

impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -226,16 +221,15 @@ struct DictionaryDecoder<K, V> {

impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
type Slice = DictionaryBuffer<K, V>;

fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;

let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8)
{
let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
(true, true) => ArrowType::LargeUtf8,
(true, false) => ArrowType::LargeBinary,
(false, true) => ArrowType::Utf8,
Expand Down Expand Up @@ -274,8 +268,7 @@ where

let len = num_values as usize;
let mut buffer = OffsetBuffer::<V>::default();
let mut decoder =
ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;

let array = buffer.into_array(None, self.value_type.clone());
Expand Down Expand Up @@ -339,8 +332,8 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
let keys_slice = keys.spare_capacity_mut(range.start + len);
Copy link
Contributor Author

@tustvold tustvold Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually incorrect, but didn't matter as spare_capacity_mut didn't update the length of the buffer directly, and so this would just potentially allocate more space than necessary

let len = decoder.get_batch(&mut keys_slice[range.start..])?;
let keys_slice = keys.get_output_slice(len);
let len = decoder.get_batch(keys_slice)?;
*max_remaining_values -= len;
Ok(len)
}
Expand All @@ -360,11 +353,7 @@ where
let dict_offsets = dict_buffers[0].typed_data::<V>();
let dict_values = dict_buffers[1].as_slice();

values.extend_from_dictionary(
&keys[..len],
dict_offsets,
dict_values,
)?;
values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?;
*max_remaining_values -= len;
Ok(len)
}
Expand All @@ -375,9 +364,7 @@ where

fn skip_values(&mut self, num_values: usize) -> Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
MaybeDictionaryDecoder::Fallback(decoder) => {
decoder.skip::<V>(num_values, None)
}
MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::<V>(num_values, None),
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
Expand Down
Loading
Loading