From 5730aa0afed4be5f1447507ff000c008e267aa87 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sat, 15 Jun 2024 10:44:23 +0800 Subject: [PATCH] rebase master --- parquet/src/arrow/array_reader/builder.rs | 2 +- parquet/src/arrow/array_reader/byte_array.rs | 143 +++- .../src/arrow/array_reader/byte_view_array.rs | 663 ------------------ parquet/src/arrow/array_reader/mod.rs | 3 - parquet/src/arrow/arrow_reader/mod.rs | 2 +- parquet/src/arrow/buffer/mod.rs | 1 - parquet/src/arrow/buffer/view_buffer.rs | 175 ----- 7 files changed, 144 insertions(+), 845 deletions(-) delete mode 100644 parquet/src/arrow/array_reader/byte_view_array.rs delete mode 100644 parquet/src/arrow/buffer/view_buffer.rs diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 945f62526a7..958594c9323 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; -use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; +use crate::arrow::array_reader::byte_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 19086878c15..a165af3e5dc 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -74,6 +74,35 @@ pub fn make_byte_array_reader( } } +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + /// An [`ArrayReader`] for variable length byte arrays struct ByteArrayReader { data_type: ArrowType, @@ -588,7 +617,7 @@ mod tests { use super::*; use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringArray}; + use arrow_array::{Array, StringArray, StringViewArray}; use arrow_buffer::Buffer; #[test] @@ -714,4 +743,116 @@ mod tests { assert_eq!(decoder.skip_values(1024).unwrap(), 0); } } + + #[test] + fn test_byte_array_string_view_decoder() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + assert_eq!( + output.values.as_slice(), + "helloworldlarge payload over 12 bytesb".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("large payload over 12 bytes"), + Some("b"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_string_view_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!( + output.values.as_slice(), + "hellolarge payload over 12 bytes".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 32]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } } diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs deleted file mode 100644 index 1c27a3b58b2..00000000000 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ /dev/null @@ -1,663 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::buffer::view_buffer::ViewBuffer; -use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::GenericRecordReader; -use crate::arrow::schema::parquet_to_arrow_field; -use crate::basic::{ConvertedType, Encoding}; -use crate::column::page::PageIterator; -use crate::column::reader::decoder::ColumnValueDecoder; -use crate::data_type::Int32Type; -use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; -use crate::errors::{ParquetError, Result}; -use crate::schema::types::ColumnDescPtr; -use arrow_array::ArrayRef; -use arrow_schema::DataType as ArrowType; -use bytes::Bytes; -use std::any::Any; - -/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. -pub fn make_byte_view_array_reader( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, -) -> Result> { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { - ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, - _ => ArrowType::BinaryView, - }, - }; - - match data_type { - ArrowType::Utf8View | ArrowType::BinaryView => { - let reader = GenericRecordReader::new(column_desc); - Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) - } - _ => Err(general_err!( - "invalid data type for byte array reader - {}", - data_type - )), - } -} - -/// An [`ArrayReader`] for variable length byte arrays -struct ByteViewArrayReader { - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option>, - rep_levels_buffer: Option>, - record_reader: GenericRecordReader, -} - -impl ByteViewArrayReader { - fn new( - pages: Box, - data_type: ArrowType, - record_reader: GenericRecordReader, - ) -> Self { - Self { - data_type, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - record_reader, - } - } -} - -impl ArrayReader for ByteViewArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn read_records(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) - } - - fn consume_batch(&mut self) -> Result { - let buffer = self.record_reader.consume_record_data(); - let null_buffer = self.record_reader.consume_bitmap_buffer(); - self.def_levels_buffer = self.record_reader.consume_def_levels(); - self.rep_levels_buffer = self.record_reader.consume_rep_levels(); - self.record_reader.reset(); - - let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); - - Ok(array) - } - - fn skip_records(&mut self, num_records: usize) -> Result { - skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_deref() - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_deref() - } -} - -/// A [`ColumnValueDecoder`] for variable length byte arrays -struct ByteViewArrayColumnValueDecoder { - dict: Option, - decoder: Option, - validate_utf8: bool, -} - -impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { - type Buffer = ViewBuffer; - - fn new(desc: &ColumnDescPtr) -> Self { - let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; - Self { - dict: None, - decoder: None, - validate_utf8, - } - } - - fn set_dict( - &mut self, - buf: Bytes, - num_values: u32, - encoding: Encoding, - _is_sorted: bool, - ) -> Result<()> { - if !matches!( - encoding, - Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY - ) { - return Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )); - } - - let mut buffer = ViewBuffer::default(); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); - Ok(()) - } - - fn set_data( - &mut self, - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - ) -> Result<()> { - self.decoder = Some(ByteViewArrayDecoder::new( - encoding, - data, - num_levels, - num_values, - self.validate_utf8, - )?); - Ok(()) - } - - fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.read(out, num_values, self.dict.as_ref()) - } - - fn skip_values(&mut self, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.skip(num_values, self.dict.as_ref()) - } -} - -/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] -pub enum ByteViewArrayDecoder { - Plain(ByteViewArrayDecoderPlain), - Dictionary(ByteViewArrayDecoderDictionary), - DeltaLength(ByteViewArrayDecoderDeltaLength), - DeltaByteArray(ByteViewArrayDecoderDelta), -} - -impl ByteViewArrayDecoder { - pub fn new( - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Result { - let decoder = match encoding { - Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( - data, - num_levels, - num_values, - validate_utf8, - )), - Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( - data, num_levels, num_values, - )) - } - Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( - ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, - ), - Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( - ByteViewArrayDecoderDelta::new(data, validate_utf8)?, - ), - _ => { - return Err(general_err!( - "unsupported encoding for byte array: {}", - encoding - )) - } - }; - - Ok(decoder) - } - - /// Read up to `len` values to `out` with the optional dictionary - pub fn read( - &mut self, - out: &mut ViewBuffer, - len: usize, - dict: Option<&ViewBuffer>, - ) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.read(out, len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.read(out, dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), - } - } - - /// Skip `len` values - pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.skip(len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.skip(dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), - } - } -} - -/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderPlain { - buf: Bytes, - /// offset of buf, changed during read or skip. - offset: usize, - validate_utf8: bool, - - /// This is a maximum as the null count is not always known, e.g. value data from - /// a v1 data page - max_remaining_values: usize, -} - -impl ByteViewArrayDecoderPlain { - pub fn new( - buf: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Self { - Self { - buf, - validate_utf8, - offset: 0, - max_remaining_values: num_values.unwrap_or(num_levels), - } - } - - pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - - let to_read = len.min(self.max_remaining_values); - - let remaining_bytes = self.buf.len() - self.offset; - if remaining_bytes == 0 { - return Ok(0); - } - - let mut read = 0; - - let buf = self.buf.as_ref(); - while self.offset < self.buf.len() && read != to_read { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes); - - let start_offset = self.offset + 4; - let end_offset = start_offset + len as usize; - if end_offset > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - - output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; - - self.offset = end_offset; - read += 1; - } - self.max_remaining_values -= to_read; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(to_read) - } - - pub fn skip(&mut self, to_skip: usize) -> Result { - let to_skip = to_skip.min(self.max_remaining_values); - let mut skip = 0; - let buf = self.buf.as_ref(); - - while self.offset < self.buf.len() && skip != to_skip { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes) as usize; - skip += 1; - self.offset = self.offset + 4 + len; - } - self.max_remaining_values -= skip; - Ok(skip) - } -} - -/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDeltaLength { - lengths: Vec, - data: Bytes, - length_offset: usize, - data_offset: usize, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDeltaLength { - fn new(data: Bytes, validate_utf8: bool) -> Result { - let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.clone(), 0)?; - let values = len_decoder.values_left(); - - let mut lengths = vec![0; values]; - len_decoder.get(&mut lengths)?; - - Ok(Self { - lengths, - data, - validate_utf8, - length_offset: 0, - data_offset: len_decoder.get_offset(), - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - - let to_read = len.min(self.lengths.len() - self.length_offset); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; - - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - if self.data_offset + total_bytes > self.data.len() { - return Err(ParquetError::EOF( - "Insufficient delta length byte array bytes".to_string(), - )); - } - - let mut start_offset = self.data_offset; - for length in src_lengths { - let end_offset = start_offset + *length as usize; - output.try_push( - &self.data.as_ref()[start_offset..end_offset], - self.validate_utf8, - )?; - start_offset = end_offset; - } - - self.data_offset = start_offset; - self.length_offset += to_read; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(to_read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - let remain_values = self.lengths.len() - self.length_offset; - let to_skip = remain_values.min(to_skip); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - self.data_offset += total_bytes; - self.length_offset += to_skip; - Ok(to_skip) - } -} - -/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDelta { - decoder: DeltaByteArrayDecoder, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDelta { - fn new(data: Bytes, validate_utf8: bool) -> Result { - Ok(Self { - decoder: DeltaByteArrayDecoder::new(data)?, - validate_utf8, - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let initial_values_length = output.views.len(); - let read = self - .decoder - .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; - } - - Ok(read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - self.decoder.skip(to_skip) - } -} - -/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDictionary { - decoder: DictIndexDecoder, -} - -impl ByteViewArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } - } - - fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, &dict.buffer) - }) - } - - fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.skip(to_skip) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; - use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringViewArray}; - use arrow_buffer::Buffer; - - #[test] - fn test_byte_array_decoder() { - let (pages, encoded_dictionary) = byte_array_all_encodings(vec![ - "hello", - "world", - "here comes the snow", - "large payload over 12 bytes", - ]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - let first = output.views.first().unwrap(); - let len = *first as u32; - - assert_eq!( - &first.to_le_bytes()[4..4 + len as usize], - "hello".as_bytes() - ); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); - - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("world"), - None, - Some("here comes the snow"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_skip() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - let first = output.views.first().unwrap(); - let len = *first as u32; - - assert_eq!( - &first.to_le_bytes()[4..4 + len as usize], - "hello".as_bytes() - ); - - assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.skip_values(1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_nulls() { - let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - // test nulls read - for (encoding, page) in pages.clone() { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); - } - - // test nulls skip - for (encoding, page) in pages { - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.skip_values(1024).unwrap(), 0); - } - } -} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index fef1e8722f8..4ae0f5669e8 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -32,7 +32,6 @@ use crate::file::reader::{FilePageIterator, FileReader}; mod builder; mod byte_array; mod byte_array_dictionary; -mod byte_view_array; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -49,8 +48,6 @@ pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks -pub use byte_view_array::make_byte_view_array_reader; -#[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; pub use list_array::ListArrayReader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e5b0e7d0573..6b95324bee3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2308,7 +2308,7 @@ mod tests { ), ( invalid_utf8_later_char::(), - "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 3", + "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6", ), ]; for (array, expected_error) in cases { diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index 544c8c88626..cbc795d94f5 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,4 +20,3 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; -pub(crate) mod view_buffer; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs deleted file mode 100644 index 743ee6cf288..00000000000 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::ValuesBuffer; -use crate::errors::{ParquetError, Result}; -use arrow_array::{make_array, ArrayRef}; -use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; -use arrow_data::{ArrayDataBuilder, ByteView}; -use arrow_schema::DataType as ArrowType; - -/// A buffer of variable-sized byte arrays that can be converted into -/// a corresponding [`ArrayRef`] -#[derive(Debug, Default)] -pub struct ViewBuffer { - pub views: Vec, - pub buffer: Vec, -} - -impl ViewBuffer { - /// Returns the number of byte arrays in this buffer - pub fn len(&self) -> usize { - self.views.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// If `validate_utf8` this verifies that the first character of `data` is - /// the start of a UTF-8 codepoint - /// - /// Note: This does not verify that the entirety of `data` is valid - /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after - /// all data has been written - pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { - if validate_utf8 { - if let Some(&b) = data.first() { - // A valid code-point iff it does not start with 0b10xxxxxx - // Bit-magic taken from `std::str::is_char_boundary` - if (b as i8) < -0x40 { - return Err(ParquetError::General( - "encountered non UTF-8 data".to_string(), - )); - } - } - } - let length: u32 = data.len().try_into().unwrap(); - if length <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); - view_buffer[4..4 + length as usize].copy_from_slice(data); - self.views.push(u128::from_le_bytes(view_buffer)); - return Ok(()); - } - - let offset = self.buffer.len() as u32; - self.buffer.extend_from_slice(data); - - let view = ByteView { - length, - prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), - buffer_index: 0, - offset, - }; - self.views.push(view.into()); - Ok(()) - } - - /// Extends this buffer with a list of keys - pub fn extend_from_dictionary( - &mut self, - keys: &[K], - dict_views: &[u128], - dict_buffers: &[u8], - ) -> Result<()> { - for key in keys { - let index = key.as_usize(); - if index >= dict_views.len() { - return Err(general_err!( - "dictionary key beyond bounds of dictionary: 0..{}", - dict_views.len() - )); - } - - let view = dict_views[index]; - let len = view as u32; - - // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. - if len <= 12 { - self.views.push(view); - } else { - let offset = (view >> 96) as usize; - self.try_push(&dict_buffers[offset..offset + len as usize], false)? - }; - } - Ok(()) - } - - /// Validates that `&self.views[start_offset..]`'s all data are valid UTF-8 sequence - pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> { - let views_slice = &self.views.as_slice()[start_offset..]; - // check inlined view first - for view in views_slice { - if *view as u32 > 12 { - continue; - } - let len = *view as u32; - if let Err(e) = std::str::from_utf8(&view.to_byte_slice()[4..4 + len as usize]) { - return Err(general_err!("encountered non UTF-8 data: {}", e)); - } - } - let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12); - if first_buffer.is_none() { - return Ok(()); - } - let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } - } - - /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` - pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { - let len = self.len(); - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .add_buffer(Buffer::from_vec(self.buffer)) - .null_bit_buffer(null_buffer); - - let data = match cfg!(debug_assertions) { - true => array_data_builder.build().unwrap(), - false => unsafe { array_data_builder.build_unchecked() }, - }; - - make_array(data) - } -} - -impl ValuesBuffer for ViewBuffer { - fn pad_nulls( - &mut self, - read_offset: usize, - values_read: usize, - levels_read: usize, - valid_mask: &[u8], - ) { - self.views.resize(read_offset + levels_read, 0); - - let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - self.views[level_pos] = self.views[value_pos]; - } - } -}