From de5fddb75932cdcfef8fe9ae2160dceb19541008 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Wed, 27 Mar 2024 00:41:33 +0800 Subject: [PATCH 1/5] using view_buffer instead of offset_buffer for view type parquet reader --- arrow-data/src/byte_view.rs | 5 + parquet/src/arrow/array_reader/builder.rs | 2 +- parquet/src/arrow/array_reader/byte_array.rs | 144 +--- .../src/arrow/array_reader/byte_view_array.rs | 646 ++++++++++++++++++ parquet/src/arrow/array_reader/mod.rs | 3 + parquet/src/arrow/buffer/mod.rs | 1 + parquet/src/arrow/buffer/view_buffer.rs | 145 ++++ testing | 2 +- 8 files changed, 803 insertions(+), 145 deletions(-) create mode 100644 parquet/src/arrow/array_reader/byte_view_array.rs create mode 100644 parquet/src/arrow/buffer/view_buffer.rs diff --git a/arrow-data/src/byte_view.rs b/arrow-data/src/byte_view.rs index b8b1731ac60..2cdabad3f71 100644 --- a/arrow-data/src/byte_view.rs +++ b/arrow-data/src/byte_view.rs @@ -39,6 +39,11 @@ impl ByteView { | ((self.buffer_index as u128) << 64) | ((self.offset as u128) << 96) } + + #[inline(always)] + pub fn as_bytes(self) -> [u8; 16] { + self.as_u128().to_le_bytes() + } } impl From for ByteView { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 958594c9323..945f62526a7 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; -use crate::arrow::array_reader::byte_array::make_byte_view_array_reader; +use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index d0aa6f7b1eb..19086878c15 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -74,36 +74,6 @@ pub fn make_byte_array_reader( } } -/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. -pub fn make_byte_view_array_reader( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, -) -> Result> { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { - ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, - _ => ArrowType::BinaryView, - }, - }; - - match data_type { - ArrowType::BinaryView | ArrowType::Utf8View => { - let reader = GenericRecordReader::new(column_desc); - Ok(Box::new(ByteArrayReader::::new( - pages, data_type, reader, - ))) - } - - _ => Err(general_err!( - "invalid data type for byte array reader read to view type - {}", - data_type - )), - } -} - /// An [`ArrayReader`] for variable length byte arrays struct ByteArrayReader { data_type: ArrowType, @@ -618,7 +588,7 @@ mod tests { use super::*; use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringArray, StringViewArray}; + use arrow_array::{Array, StringArray}; use arrow_buffer::Buffer; #[test] @@ -676,64 +646,6 @@ mod tests { } } - #[test] - fn test_byte_array_string_view_decoder() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]); - - let column_desc = utf8_column(); - let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(output.values.as_slice(), "hello".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5]); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); - - assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); - assert_eq!( - output.values.as_slice(), - "helloworldlarge payload over 12 bytesb".as_bytes() - ); - assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("world"), - None, - Some("large payload over 12 bytes"), - Some("b"), - None, - None, - ] - ); - } - } - #[test] fn test_byte_array_decoder_skip() { let (pages, encoded_dictionary) = @@ -778,60 +690,6 @@ mod tests { } } - #[test] - fn test_byte_array_string_view_decoder_skip() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(output.values.as_slice(), "hello".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5]); - - assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.skip_values(1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - assert_eq!( - output.values.as_slice(), - "hellolarge payload over 12 bytes".as_bytes() - ); - assert_eq!(output.offsets.as_slice(), &[0, 5, 32]); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - #[test] fn test_byte_array_decoder_nulls() { let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs new file mode 100644 index 00000000000..80b9a24cb9c --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -0,0 +1,646 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow_array::ArrayRef; +use arrow_schema::DataType as ArrowType; +use bytes::Bytes; +use std::any::Any; + +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteViewArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + record_reader: GenericRecordReader, +} + +impl ByteViewArrayReader { + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteViewArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self) -> Result { + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + + let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); + + Ok(array) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteViewArrayColumnValueDecoder { + dict: Option, + decoder: Option, + validate_utf8: bool, +} + +impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { + type Buffer = ViewBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: Bytes, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = ViewBuffer::default(); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + self.decoder = Some(ByteViewArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, num_values, self.dict.as_ref()) + } + + fn skip_values(&mut self, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) + } +} + +/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] +pub enum ByteViewArrayDecoder { + Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), + DeltaLength(ByteViewArrayDecoderDeltaLength), + DeltaByteArray(ByteViewArrayDecoderDelta), +} + +impl ByteViewArrayDecoder { + pub fn new( + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Result { + let decoder = match encoding { + Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( + ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( + ByteViewArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut ViewBuffer, + len: usize, + dict: Option<&ViewBuffer>, + ) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.read(out, dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } + + /// Skip `len` values + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderPlain { + buf: Bytes, + /// offset of buf, changed during read or skip. + offset: usize, + validate_utf8: bool, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteViewArrayDecoderPlain { + pub fn new( + buf: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Self { + Self { + buf, + validate_utf8, + offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.max_remaining_values); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + + self.offset = end_offset; + read += 1; + } + self.max_remaining_values -= to_read; + + Ok(to_read) + } + + pub fn skip(&mut self, to_skip: usize) -> Result { + let to_skip = to_skip.min(self.max_remaining_values); + let mut skip = 0; + let buf = self.buf.as_ref(); + + while self.offset < self.buf.len() && skip != to_skip { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + skip += 1; + self.offset = self.offset + 4 + len; + } + self.max_remaining_values -= skip; + Ok(skip) + } +} + +/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDeltaLength { + lengths: Vec, + data: Bytes, + length_offset: usize, + data_offset: usize, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDeltaLength { + fn new(data: Bytes, validate_utf8: bool) -> Result { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.clone(), 0)?; + let values = len_decoder.values_left(); + + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + validate_utf8, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.lengths.len() - self.length_offset); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; + + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + if self.data_offset + total_bytes > self.data.len() { + return Err(ParquetError::EOF( + "Insufficient delta length byte array bytes".to_string(), + )); + } + + let mut start_offset = self.data_offset; + for length in src_lengths { + let end_offset = start_offset + *length as usize; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; + start_offset = end_offset; + } + + self.data_offset = start_offset; + self.length_offset += to_read; + + Ok(to_read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + let remain_values = self.lengths.len() - self.length_offset; + let to_skip = remain_values.min(to_skip); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + self.data_offset += total_bytes; + self.length_offset += to_skip; + Ok(to_skip) + } +} + +/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDelta { + decoder: DeltaByteArrayDecoder, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDelta { + fn new(data: Bytes, validate_utf8: bool) -> Result { + Ok(Self { + decoder: DeltaByteArrayDecoder::new(data)?, + validate_utf8, + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let read = self + .decoder + .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; + + Ok(read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + self.decoder.skip(to_skip) + } +} + +/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDictionary { + decoder: DictIndexDecoder, +} + +impl ByteViewArrayDecoderDictionary { + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values), + } + } + + fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.read(len, |keys| { + output.extend_from_dictionary(keys, &dict.views, &dict.buffers) + }) + } + + fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.skip(to_skip) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use arrow_array::{Array, StringViewArray}; + use arrow_buffer::Buffer; + + #[test] + fn test_byte_array_decoder() { + let (pages, encoded_dictionary) = byte_array_all_encodings(vec![ + "hello", + "world", + "here comes the snow", + "large payload over 12 bytes", + ]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + let first = output.views.first().unwrap(); + let len = *first as u32; + + assert_eq!( + &first.to_le_bytes()[4..4 + len as usize], + "hello".as_bytes() + ); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("here comes the snow"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + let first = output.views.first().unwrap(); + let len = *first as u32; + + assert_eq!( + &first.to_le_bytes()[4..4 + len as usize], + "hello".as_bytes() + ); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_nulls() { + let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + // test nulls read + for (encoding, page) in pages.clone() { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); + } + + // test nulls skip + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.skip_values(1024).unwrap(), 0); + } + } +} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 8febe41e688..d0510c79026 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -32,6 +32,7 @@ use crate::file::reader::{FilePageIterator, FileReader}; mod builder; mod byte_array; mod byte_array_dictionary; +mod byte_view_array; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -50,6 +51,8 @@ pub use byte_array::make_byte_array_reader; pub use byte_array::make_byte_view_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks +pub use byte_view_array::make_byte_view_array_reader; +#[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; pub use list_array::ListArrayReader; diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index cbc795d94f5..544c8c88626 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,3 +20,4 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; +pub(crate) mod view_buffer; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs new file mode 100644 index 00000000000..143066a3813 --- /dev/null +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec, + pub buffers: Vec, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let offset = self.buffers.len() as u32; + self.buffers.extend_from_slice(data); + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset, + }; + self.views.push(view.into()); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary( + &mut self, + keys: &[K], + dict_views: &[u128], + dict_buffers: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index >= dict_views.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_views.len() + )); + } + + let view = dict_views[index]; + let len = view as u32; + + // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. + if len <= 12 { + self.views.push(view); + } else { + let offset = (view >> 96) as usize; + self.try_push(&dict_buffers[offset..offset + len as usize], false)? + }; + } + Ok(()) + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { + let len = self.len(); + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .add_buffer(Buffer::from_vec(self.buffers)) + .null_bit_buffer(null_buffer); + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() }, + }; + + make_array(data) + } +} + +impl ValuesBuffer for ViewBuffer { + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + self.views.resize(read_offset + levels_read, 0); + + let values_range = read_offset..read_offset + values_read; + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + self.views[level_pos] = self.views[value_pos]; + } + } +} diff --git a/testing b/testing index e270341fb5f..735ae7128d5 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 From b0b22dfca2f7d59f49cb7bc353982a18565df358 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sat, 13 Apr 2024 23:25:28 +0800 Subject: [PATCH 2/5] add check valid utf8 --- .../src/arrow/array_reader/byte_view_array.rs | 19 ++++++++- parquet/src/arrow/arrow_writer/mod.rs | 18 +++++++++ parquet/src/arrow/buffer/view_buffer.rs | 40 ++++++++++++++++--- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 80b9a24cb9c..1c27a3b58b2 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -315,6 +315,8 @@ impl ByteViewArrayDecoderPlain { } pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let initial_values_length = output.views.len(); + let to_read = len.min(self.max_remaining_values); let remaining_bytes = self.buf.len() - self.offset; @@ -345,6 +347,10 @@ impl ByteViewArrayDecoderPlain { } self.max_remaining_values -= to_read; + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(to_read) } @@ -395,6 +401,8 @@ impl ByteViewArrayDecoderDeltaLength { } fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let initial_values_length = output.views.len(); + let to_read = len.min(self.lengths.len() - self.length_offset); let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; @@ -420,6 +428,10 @@ impl ByteViewArrayDecoderDeltaLength { self.data_offset = start_offset; self.length_offset += to_read; + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(to_read) } @@ -451,10 +463,15 @@ impl ByteViewArrayDecoderDelta { } fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let initial_values_length = output.views.len(); let read = self .decoder .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(read) } @@ -482,7 +499,7 @@ impl ByteViewArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, &dict.buffers) + output.extend_from_dictionary(keys, &dict.views, &dict.buffer) }) } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 53287dec572..9f9a72c2155 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2103,6 +2103,16 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn binary_view_single_column() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + // BinaryArrays can't be built from Vec>, so only call `values_required` + values_required::(many_vecs_iter); + } + #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); @@ -2187,6 +2197,14 @@ mod tests { required_and_optional::(raw_strs); } + #[test] + fn string_view_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + required_and_optional::(raw_strs); + } + #[test] fn large_string_single_column() { let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 143066a3813..743ee6cf288 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -19,7 +19,7 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, ArrayRef}; -use arrow_buffer::{ArrowNativeType, Buffer}; +use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::{ArrayDataBuilder, ByteView}; use arrow_schema::DataType as ArrowType; @@ -28,7 +28,7 @@ use arrow_schema::DataType as ArrowType; #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, - pub buffers: Vec, + pub buffer: Vec, } impl ViewBuffer { @@ -41,6 +41,12 @@ impl ViewBuffer { self.len() == 0 } + /// If `validate_utf8` this verifies that the first character of `data` is + /// the start of a UTF-8 codepoint + /// + /// Note: This does not verify that the entirety of `data` is valid + /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after + /// all data has been written pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { if validate_utf8 { if let Some(&b) = data.first() { @@ -62,8 +68,8 @@ impl ViewBuffer { return Ok(()); } - let offset = self.buffers.len() as u32; - self.buffers.extend_from_slice(data); + let offset = self.buffer.len() as u32; + self.buffer.extend_from_slice(data); let view = ByteView { length, @@ -105,13 +111,37 @@ impl ViewBuffer { Ok(()) } + /// Validates that `&self.views[start_offset..]`'s all data are valid UTF-8 sequence + pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> { + let views_slice = &self.views.as_slice()[start_offset..]; + // check inlined view first + for view in views_slice { + if *view as u32 > 12 { + continue; + } + let len = *view as u32; + if let Err(e) = std::str::from_utf8(&view.to_byte_slice()[4..4 + len as usize]) { + return Err(general_err!("encountered non UTF-8 data: {}", e)); + } + } + let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12); + if first_buffer.is_none() { + return Ok(()); + } + let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; + match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } + } + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let len = self.len(); let array_data_builder = ArrayDataBuilder::new(data_type) .len(len) .add_buffer(Buffer::from_vec(self.views)) - .add_buffer(Buffer::from_vec(self.buffers)) + .add_buffer(Buffer::from_vec(self.buffer)) .null_bit_buffer(null_buffer); let data = match cfg!(debug_assertions) { From 54559b42d96796fcc1eec3d15f6ff33d03239c8e Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 15 Apr 2024 00:08:25 +0800 Subject: [PATCH 3/5] use different buffer for different encoding --- .../src/arrow/array_reader/byte_view_array.rs | 11 ++- parquet/src/arrow/buffer/view_buffer.rs | 75 ++++++++++++++++--- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1c27a3b58b2..156cb0c5e47 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -327,6 +327,7 @@ impl ByteViewArrayDecoderPlain { let mut read = 0; let buf = self.buf.as_ref(); + output.add_buffer(self.buf.clone()); while self.offset < self.buf.len() && read != to_read { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte view array".into())); @@ -340,7 +341,7 @@ impl ByteViewArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte view array".into())); } - output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + output.try_push_with_offset(start_offset, end_offset)?; self.offset = end_offset; read += 1; @@ -416,12 +417,10 @@ impl ByteViewArrayDecoderDeltaLength { } let mut start_offset = self.data_offset; + output.add_buffer(self.data.clone()); for length in src_lengths { let end_offset = start_offset + *length as usize; - output.try_push( - &self.data.as_ref()[start_offset..end_offset], - self.validate_utf8, - )?; + output.try_push_with_offset(start_offset, end_offset)?; start_offset = end_offset; } @@ -499,7 +498,7 @@ impl ByteViewArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, &dict.buffer) + output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap()) }) } diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 743ee6cf288..784a67c2e1c 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -22,13 +22,22 @@ use arrow_array::{make_array, ArrayRef}; use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::{ArrayDataBuilder, ByteView}; use arrow_schema::DataType as ArrowType; +use bytes::Bytes; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, - pub buffer: Vec, + /// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer` + /// to hold the page data without copy. + pub plain_buffer: Option, + /// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer + /// since this encoding's page data not hold full data. + /// + /// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers + /// cause these encoding first build dict then use dict to read data. + pub delta_buffer: Vec, } impl ViewBuffer { @@ -41,6 +50,36 @@ impl ViewBuffer { self.len() == 0 } + /// add entire page buf to [`ViewBuffer`], avoid copy data. + pub fn add_buffer(&mut self, buf: Bytes) { + if self.plain_buffer.is_none() { + self.plain_buffer = Some(buf); + } + } + + /// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`], + /// we only need to slice the data to build the view. + pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> { + let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset]; + let length: u32 = (end_offset - start_offset) as u32; + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset: start_offset as u32, + }; + self.views.push(view.into()); + Ok(()) + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -68,8 +107,8 @@ impl ViewBuffer { return Ok(()); } - let offset = self.buffer.len() as u32; - self.buffer.extend_from_slice(data); + let offset = self.delta_buffer.len() as u32; + self.delta_buffer.extend_from_slice(data); let view = ByteView { length, @@ -129,20 +168,34 @@ impl ViewBuffer { return Ok(()); } let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + if self.plain_buffer.is_none() { + match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } + } else { + match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } } } /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let len = self.len(); - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .add_buffer(Buffer::from_vec(self.buffer)) - .null_bit_buffer(null_buffer); + let array_data_builder = { + let builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .null_bit_buffer(null_buffer); + + if self.plain_buffer.is_none() { + builder.add_buffer(Buffer::from_vec(self.delta_buffer)) + } else { + builder.add_buffer(self.plain_buffer.unwrap().into()) + } + }; let data = match cfg!(debug_assertions) { true => array_data_builder.build().unwrap(), From fd99ce21127bd1c6ed1eb77df36b1c7aef45186e Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 15 Apr 2024 01:26:45 +0800 Subject: [PATCH 4/5] Revert "use different buffer for different encoding" This reverts commit 71f51aa10079cdadfe7a0f5026563eedd7f76bd9. --- .../src/arrow/array_reader/byte_view_array.rs | 11 +-- parquet/src/arrow/array_reader/mod.rs | 2 - parquet/src/arrow/arrow_reader/mod.rs | 2 +- parquet/src/arrow/buffer/view_buffer.rs | 75 +++---------------- 4 files changed, 18 insertions(+), 72 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 156cb0c5e47..1c27a3b58b2 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -327,7 +327,6 @@ impl ByteViewArrayDecoderPlain { let mut read = 0; let buf = self.buf.as_ref(); - output.add_buffer(self.buf.clone()); while self.offset < self.buf.len() && read != to_read { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte view array".into())); @@ -341,7 +340,7 @@ impl ByteViewArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte view array".into())); } - output.try_push_with_offset(start_offset, end_offset)?; + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; self.offset = end_offset; read += 1; @@ -417,10 +416,12 @@ impl ByteViewArrayDecoderDeltaLength { } let mut start_offset = self.data_offset; - output.add_buffer(self.data.clone()); for length in src_lengths { let end_offset = start_offset + *length as usize; - output.try_push_with_offset(start_offset, end_offset)?; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; start_offset = end_offset; } @@ -498,7 +499,7 @@ impl ByteViewArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap()) + output.extend_from_dictionary(keys, &dict.views, &dict.buffer) }) } diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d0510c79026..fef1e8722f8 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -47,8 +47,6 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; -#[allow(unused_imports)] // Only used for benchmarks -pub use byte_array::make_byte_view_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use byte_view_array::make_byte_view_array_reader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6b95324bee3..e5b0e7d0573 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2308,7 +2308,7 @@ mod tests { ), ( invalid_utf8_later_char::(), - "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6", + "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 3", ), ]; for (array, expected_error) in cases { diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 784a67c2e1c..743ee6cf288 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -22,22 +22,13 @@ use arrow_array::{make_array, ArrayRef}; use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::{ArrayDataBuilder, ByteView}; use arrow_schema::DataType as ArrowType; -use bytes::Bytes; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, - /// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer` - /// to hold the page data without copy. - pub plain_buffer: Option, - /// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer - /// since this encoding's page data not hold full data. - /// - /// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers - /// cause these encoding first build dict then use dict to read data. - pub delta_buffer: Vec, + pub buffer: Vec, } impl ViewBuffer { @@ -50,36 +41,6 @@ impl ViewBuffer { self.len() == 0 } - /// add entire page buf to [`ViewBuffer`], avoid copy data. - pub fn add_buffer(&mut self, buf: Bytes) { - if self.plain_buffer.is_none() { - self.plain_buffer = Some(buf); - } - } - - /// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`], - /// we only need to slice the data to build the view. - pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> { - let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset]; - let length: u32 = (end_offset - start_offset) as u32; - if length <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); - view_buffer[4..4 + length as usize].copy_from_slice(data); - self.views.push(u128::from_le_bytes(view_buffer)); - return Ok(()); - } - - let view = ByteView { - length, - prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), - buffer_index: 0, - offset: start_offset as u32, - }; - self.views.push(view.into()); - Ok(()) - } - /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -107,8 +68,8 @@ impl ViewBuffer { return Ok(()); } - let offset = self.delta_buffer.len() as u32; - self.delta_buffer.extend_from_slice(data); + let offset = self.buffer.len() as u32; + self.buffer.extend_from_slice(data); let view = ByteView { length, @@ -168,34 +129,20 @@ impl ViewBuffer { return Ok(()); } let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - if self.plain_buffer.is_none() { - match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } - } else { - match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } + match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), } } /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let len = self.len(); - let array_data_builder = { - let builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .null_bit_buffer(null_buffer); - - if self.plain_buffer.is_none() { - builder.add_buffer(Buffer::from_vec(self.delta_buffer)) - } else { - builder.add_buffer(self.plain_buffer.unwrap().into()) - } - }; + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .add_buffer(Buffer::from_vec(self.buffer)) + .null_bit_buffer(null_buffer); let data = match cfg!(debug_assertions) { true => array_data_builder.build().unwrap(), From 4dd2e154026fbe0d977a62b4496029aa52e361b3 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sat, 15 Jun 2024 10:44:23 +0800 Subject: [PATCH 5/5] rebase master --- parquet/src/arrow/array_reader/builder.rs | 2 +- parquet/src/arrow/array_reader/byte_array.rs | 144 +++- .../src/arrow/array_reader/byte_view_array.rs | 663 ------------------ parquet/src/arrow/array_reader/mod.rs | 5 +- parquet/src/arrow/arrow_reader/mod.rs | 2 +- parquet/src/arrow/buffer/mod.rs | 1 - parquet/src/arrow/buffer/view_buffer.rs | 175 ----- 7 files changed, 147 insertions(+), 845 deletions(-) delete mode 100644 parquet/src/arrow/array_reader/byte_view_array.rs delete mode 100644 parquet/src/arrow/buffer/view_buffer.rs diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 945f62526a7..958594c9323 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; -use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; +use crate::arrow::array_reader::byte_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 19086878c15..d0aa6f7b1eb 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -74,6 +74,36 @@ pub fn make_byte_array_reader( } } +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::BinaryView | ArrowType::Utf8View => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + + _ => Err(general_err!( + "invalid data type for byte array reader read to view type - {}", + data_type + )), + } +} + /// An [`ArrayReader`] for variable length byte arrays struct ByteArrayReader { data_type: ArrowType, @@ -588,7 +618,7 @@ mod tests { use super::*; use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringArray}; + use arrow_array::{Array, StringArray, StringViewArray}; use arrow_buffer::Buffer; #[test] @@ -646,6 +676,64 @@ mod tests { } } + #[test] + fn test_byte_array_string_view_decoder() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + assert_eq!( + output.values.as_slice(), + "helloworldlarge payload over 12 bytesb".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("large payload over 12 bytes"), + Some("b"), + None, + None, + ] + ); + } + } + #[test] fn test_byte_array_decoder_skip() { let (pages, encoded_dictionary) = @@ -690,6 +778,60 @@ mod tests { } } + #[test] + fn test_byte_array_string_view_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!( + output.values.as_slice(), + "hellolarge payload over 12 bytes".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 32]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + #[test] fn test_byte_array_decoder_nulls() { let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs deleted file mode 100644 index 1c27a3b58b2..00000000000 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ /dev/null @@ -1,663 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::buffer::view_buffer::ViewBuffer; -use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::GenericRecordReader; -use crate::arrow::schema::parquet_to_arrow_field; -use crate::basic::{ConvertedType, Encoding}; -use crate::column::page::PageIterator; -use crate::column::reader::decoder::ColumnValueDecoder; -use crate::data_type::Int32Type; -use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; -use crate::errors::{ParquetError, Result}; -use crate::schema::types::ColumnDescPtr; -use arrow_array::ArrayRef; -use arrow_schema::DataType as ArrowType; -use bytes::Bytes; -use std::any::Any; - -/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. -pub fn make_byte_view_array_reader( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, -) -> Result> { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { - ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, - _ => ArrowType::BinaryView, - }, - }; - - match data_type { - ArrowType::Utf8View | ArrowType::BinaryView => { - let reader = GenericRecordReader::new(column_desc); - Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) - } - _ => Err(general_err!( - "invalid data type for byte array reader - {}", - data_type - )), - } -} - -/// An [`ArrayReader`] for variable length byte arrays -struct ByteViewArrayReader { - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option>, - rep_levels_buffer: Option>, - record_reader: GenericRecordReader, -} - -impl ByteViewArrayReader { - fn new( - pages: Box, - data_type: ArrowType, - record_reader: GenericRecordReader, - ) -> Self { - Self { - data_type, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - record_reader, - } - } -} - -impl ArrayReader for ByteViewArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn read_records(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) - } - - fn consume_batch(&mut self) -> Result { - let buffer = self.record_reader.consume_record_data(); - let null_buffer = self.record_reader.consume_bitmap_buffer(); - self.def_levels_buffer = self.record_reader.consume_def_levels(); - self.rep_levels_buffer = self.record_reader.consume_rep_levels(); - self.record_reader.reset(); - - let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); - - Ok(array) - } - - fn skip_records(&mut self, num_records: usize) -> Result { - skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_deref() - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_deref() - } -} - -/// A [`ColumnValueDecoder`] for variable length byte arrays -struct ByteViewArrayColumnValueDecoder { - dict: Option, - decoder: Option, - validate_utf8: bool, -} - -impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { - type Buffer = ViewBuffer; - - fn new(desc: &ColumnDescPtr) -> Self { - let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; - Self { - dict: None, - decoder: None, - validate_utf8, - } - } - - fn set_dict( - &mut self, - buf: Bytes, - num_values: u32, - encoding: Encoding, - _is_sorted: bool, - ) -> Result<()> { - if !matches!( - encoding, - Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY - ) { - return Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )); - } - - let mut buffer = ViewBuffer::default(); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); - Ok(()) - } - - fn set_data( - &mut self, - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - ) -> Result<()> { - self.decoder = Some(ByteViewArrayDecoder::new( - encoding, - data, - num_levels, - num_values, - self.validate_utf8, - )?); - Ok(()) - } - - fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.read(out, num_values, self.dict.as_ref()) - } - - fn skip_values(&mut self, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.skip(num_values, self.dict.as_ref()) - } -} - -/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] -pub enum ByteViewArrayDecoder { - Plain(ByteViewArrayDecoderPlain), - Dictionary(ByteViewArrayDecoderDictionary), - DeltaLength(ByteViewArrayDecoderDeltaLength), - DeltaByteArray(ByteViewArrayDecoderDelta), -} - -impl ByteViewArrayDecoder { - pub fn new( - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Result { - let decoder = match encoding { - Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( - data, - num_levels, - num_values, - validate_utf8, - )), - Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( - data, num_levels, num_values, - )) - } - Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( - ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, - ), - Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( - ByteViewArrayDecoderDelta::new(data, validate_utf8)?, - ), - _ => { - return Err(general_err!( - "unsupported encoding for byte array: {}", - encoding - )) - } - }; - - Ok(decoder) - } - - /// Read up to `len` values to `out` with the optional dictionary - pub fn read( - &mut self, - out: &mut ViewBuffer, - len: usize, - dict: Option<&ViewBuffer>, - ) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.read(out, len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.read(out, dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), - } - } - - /// Skip `len` values - pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.skip(len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.skip(dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), - } - } -} - -/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderPlain { - buf: Bytes, - /// offset of buf, changed during read or skip. - offset: usize, - validate_utf8: bool, - - /// This is a maximum as the null count is not always known, e.g. value data from - /// a v1 data page - max_remaining_values: usize, -} - -impl ByteViewArrayDecoderPlain { - pub fn new( - buf: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Self { - Self { - buf, - validate_utf8, - offset: 0, - max_remaining_values: num_values.unwrap_or(num_levels), - } - } - - pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - - let to_read = len.min(self.max_remaining_values); - - let remaining_bytes = self.buf.len() - self.offset; - if remaining_bytes == 0 { - return Ok(0); - } - - let mut read = 0; - - let buf = self.buf.as_ref(); - while self.offset < self.buf.len() && read != to_read { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes); - - let start_offset = self.offset + 4; - let end_offset = start_offset + len as usize; - if end_offset > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - - output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; - - self.offset = end_offset; - read += 1; - } - self.max_remaining_values -= to_read; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(to_read) - } - - pub fn skip(&mut self, to_skip: usize) -> Result { - let to_skip = to_skip.min(self.max_remaining_values); - let mut skip = 0; - let buf = self.buf.as_ref(); - - while self.offset < self.buf.len() && skip != to_skip { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes) as usize; - skip += 1; - self.offset = self.offset + 4 + len; - } - self.max_remaining_values -= skip; - Ok(skip) - } -} - -/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDeltaLength { - lengths: Vec, - data: Bytes, - length_offset: usize, - data_offset: usize, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDeltaLength { - fn new(data: Bytes, validate_utf8: bool) -> Result { - let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.clone(), 0)?; - let values = len_decoder.values_left(); - - let mut lengths = vec![0; values]; - len_decoder.get(&mut lengths)?; - - Ok(Self { - lengths, - data, - validate_utf8, - length_offset: 0, - data_offset: len_decoder.get_offset(), - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - - let to_read = len.min(self.lengths.len() - self.length_offset); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; - - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - if self.data_offset + total_bytes > self.data.len() { - return Err(ParquetError::EOF( - "Insufficient delta length byte array bytes".to_string(), - )); - } - - let mut start_offset = self.data_offset; - for length in src_lengths { - let end_offset = start_offset + *length as usize; - output.try_push( - &self.data.as_ref()[start_offset..end_offset], - self.validate_utf8, - )?; - start_offset = end_offset; - } - - self.data_offset = start_offset; - self.length_offset += to_read; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(to_read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - let remain_values = self.lengths.len() - self.length_offset; - let to_skip = remain_values.min(to_skip); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - self.data_offset += total_bytes; - self.length_offset += to_skip; - Ok(to_skip) - } -} - -/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDelta { - decoder: DeltaByteArrayDecoder, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDelta { - fn new(data: Bytes, validate_utf8: bool) -> Result { - Ok(Self { - decoder: DeltaByteArrayDecoder::new(data)?, - validate_utf8, - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - let read = self - .decoder - .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - self.decoder.skip(to_skip) - } -} - -/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDictionary { - decoder: DictIndexDecoder, -} - -impl ByteViewArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } - } - - fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, &dict.buffer) - }) - } - - fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.skip(to_skip) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; - use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringViewArray}; - use arrow_buffer::Buffer; - - #[test] - fn test_byte_array_decoder() { - let (pages, encoded_dictionary) = byte_array_all_encodings(vec![ - "hello", - "world", - "here comes the snow", - "large payload over 12 bytes", - ]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - let first = output.views.first().unwrap(); - let len = *first as u32; - - assert_eq!( - &first.to_le_bytes()[4..4 + len as usize], - "hello".as_bytes() - ); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); - - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("world"), - None, - Some("here comes the snow"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_skip() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - let first = output.views.first().unwrap(); - let len = *first as u32; - - assert_eq!( - &first.to_le_bytes()[4..4 + len as usize], - "hello".as_bytes() - ); - - assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.skip_values(1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_nulls() { - let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - // test nulls read - for (encoding, page) in pages.clone() { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); - } - - // test nulls skip - for (encoding, page) in pages { - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.skip_values(1024).unwrap(), 0); - } - } -} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index fef1e8722f8..8febe41e688 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -32,7 +32,6 @@ use crate::file::reader::{FilePageIterator, FileReader}; mod builder; mod byte_array; mod byte_array_dictionary; -mod byte_view_array; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -47,9 +46,9 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; -pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks -pub use byte_view_array::make_byte_view_array_reader; +pub use byte_array::make_byte_view_array_reader; +pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e5b0e7d0573..6b95324bee3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2308,7 +2308,7 @@ mod tests { ), ( invalid_utf8_later_char::(), - "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 3", + "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6", ), ]; for (array, expected_error) in cases { diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index 544c8c88626..cbc795d94f5 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,4 +20,3 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; -pub(crate) mod view_buffer; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs deleted file mode 100644 index 743ee6cf288..00000000000 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::ValuesBuffer; -use crate::errors::{ParquetError, Result}; -use arrow_array::{make_array, ArrayRef}; -use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; -use arrow_data::{ArrayDataBuilder, ByteView}; -use arrow_schema::DataType as ArrowType; - -/// A buffer of variable-sized byte arrays that can be converted into -/// a corresponding [`ArrayRef`] -#[derive(Debug, Default)] -pub struct ViewBuffer { - pub views: Vec, - pub buffer: Vec, -} - -impl ViewBuffer { - /// Returns the number of byte arrays in this buffer - pub fn len(&self) -> usize { - self.views.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// If `validate_utf8` this verifies that the first character of `data` is - /// the start of a UTF-8 codepoint - /// - /// Note: This does not verify that the entirety of `data` is valid - /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after - /// all data has been written - pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { - if validate_utf8 { - if let Some(&b) = data.first() { - // A valid code-point iff it does not start with 0b10xxxxxx - // Bit-magic taken from `std::str::is_char_boundary` - if (b as i8) < -0x40 { - return Err(ParquetError::General( - "encountered non UTF-8 data".to_string(), - )); - } - } - } - let length: u32 = data.len().try_into().unwrap(); - if length <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); - view_buffer[4..4 + length as usize].copy_from_slice(data); - self.views.push(u128::from_le_bytes(view_buffer)); - return Ok(()); - } - - let offset = self.buffer.len() as u32; - self.buffer.extend_from_slice(data); - - let view = ByteView { - length, - prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), - buffer_index: 0, - offset, - }; - self.views.push(view.into()); - Ok(()) - } - - /// Extends this buffer with a list of keys - pub fn extend_from_dictionary( - &mut self, - keys: &[K], - dict_views: &[u128], - dict_buffers: &[u8], - ) -> Result<()> { - for key in keys { - let index = key.as_usize(); - if index >= dict_views.len() { - return Err(general_err!( - "dictionary key beyond bounds of dictionary: 0..{}", - dict_views.len() - )); - } - - let view = dict_views[index]; - let len = view as u32; - - // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. - if len <= 12 { - self.views.push(view); - } else { - let offset = (view >> 96) as usize; - self.try_push(&dict_buffers[offset..offset + len as usize], false)? - }; - } - Ok(()) - } - - /// Validates that `&self.views[start_offset..]`'s all data are valid UTF-8 sequence - pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> { - let views_slice = &self.views.as_slice()[start_offset..]; - // check inlined view first - for view in views_slice { - if *view as u32 > 12 { - continue; - } - let len = *view as u32; - if let Err(e) = std::str::from_utf8(&view.to_byte_slice()[4..4 + len as usize]) { - return Err(general_err!("encountered non UTF-8 data: {}", e)); - } - } - let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12); - if first_buffer.is_none() { - return Ok(()); - } - let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } - } - - /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` - pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { - let len = self.len(); - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .add_buffer(Buffer::from_vec(self.buffer)) - .null_bit_buffer(null_buffer); - - let data = match cfg!(debug_assertions) { - true => array_data_builder.build().unwrap(), - false => unsafe { array_data_builder.build_unchecked() }, - }; - - make_array(data) - } -} - -impl ValuesBuffer for ViewBuffer { - fn pad_nulls( - &mut self, - read_offset: usize, - values_read: usize, - levels_read: usize, - valid_mask: &[u8], - ) { - self.views.resize(read_offset + levels_read, 0); - - let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - self.views[level_pos] = self.views[value_pos]; - } - } -}