diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 9f3a6809d9d0..10c3e6379d05 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -400,7 +400,23 @@ where /// ``` pub type BinaryViewArray = GenericByteViewArray; -/// A [`GenericByteViewArray`] that stores uf8 data +impl BinaryViewArray { + /// Convert the [`BinaryViewArray`] to [`StringViewArray`] + /// If items not utf8 data, validate will fail and error returned. + pub fn to_stringview(&self) -> Result { + StringViewType::validate(self.views(), self.data_buffers())?; + unsafe { Ok(self.to_stringview_unchecked()) } + } + + /// Convert the [`BinaryViewArray`] to [`StringViewArray`] + /// # Safety + /// Caller is responsible for ensuring that items in array are utf8 data. + pub unsafe fn to_stringview_unchecked(&self) -> StringViewArray { + StringViewArray::new_unchecked(self.views.clone(), self.buffers.clone(), self.nulls.clone()) + } +} + +/// A [`GenericByteViewArray`] that stores utf8 data /// /// # Example /// ``` diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 51e54215ea7f..b98c0e4b20a3 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -248,8 +248,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat } crate::Type::Binary => DataType::Binary, crate::Type::LargeBinary => DataType::LargeBinary, + crate::Type::BinaryView => DataType::BinaryView, crate::Type::Utf8 => DataType::Utf8, crate::Type::LargeUtf8 => DataType::LargeUtf8, + crate::Type::Utf8View => DataType::Utf8View, crate::Type::FixedSizeBinary => { let fsb = field.type_as_fixed_size_binary().unwrap(); DataType::FixedSizeBinary(fsb.byteWidth()) @@ -548,7 +550,11 @@ pub(crate) fn get_fb_field_type<'a>( .as_union_value(), children: Some(fbb.create_vector(&empty_fields[..])), }, - BinaryView | Utf8View => unimplemented!("unimplemented"), + BinaryView => FBFieldType { + type_type: crate::Type::BinaryView, + type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, Utf8 => FBFieldType { type_type: crate::Type::Utf8, type_: crate::Utf8Builder::new(fbb).finish().as_union_value(), @@ -568,6 +574,11 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&empty_fields[..])), } } + Utf8View => FBFieldType { + type_type: crate::Type::Utf8View, + type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, Date32 => { let mut builder = crate::DateBuilder::new(fbb); builder.add_unit(crate::DateUnit::DAY); @@ -921,7 +932,9 @@ mod tests { true, ), Field::new("utf8", DataType::Utf8, false), + Field::new("utf8_view", DataType::Utf8View, false), Field::new("binary", DataType::Binary, false), + Field::new("binary_view", DataType::BinaryView, false), Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true), Field::new_fixed_size_list( "fixed_size_list[u8]", diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index bb3f403358ee..945f62526a7e 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; +use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ @@ -29,9 +30,7 @@ use crate::arrow::array_reader::{ use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; use crate::basic::Type as PhysicalType; -use crate::data_type::{ - BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type, -}; +use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type}; use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; @@ -55,17 +54,13 @@ fn build_reader( row_groups: &dyn RowGroups, ) -> Result>> { match field.field_type { - ParquetFieldType::Primitive { .. } => { - build_primitive_reader(field, mask, row_groups) - } + ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups), ParquetFieldType::Group { .. } => match &field.arrow_type { DataType::Map(_, _) => build_map_reader(field, mask, row_groups), DataType::Struct(_) => build_struct_reader(field, mask, row_groups), DataType::List(_) => build_list_reader(field, mask, false, row_groups), DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups), - DataType::FixedSizeList(_, _) => { - build_fixed_size_list_reader(field, mask, row_groups) - } + DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups), d => unimplemented!("reading group type {} not implemented", d), }, } @@ -140,9 +135,9 @@ fn build_list_reader( DataType::List(f) => { DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type))) } - DataType::LargeList(f) => DataType::LargeList(Arc::new( - f.as_ref().clone().with_data_type(item_type), - )), + DataType::LargeList(f) => { + DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type))) + } _ => unreachable!(), }; @@ -289,6 +284,9 @@ fn build_primitive_reader( Some(DataType::Dictionary(_, _)) => { make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? } + Some(DataType::Utf8View | DataType::BinaryView) => { + make_byte_view_array_reader(page_iterator, column_desc, arrow_type)? + } _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?, }, PhysicalType::FIXED_LEN_BYTE_ARRAY => { @@ -347,8 +345,7 @@ mod tests { #[test] fn test_create_array_reader() { let file = get_test_file("nulls.snappy.parquet"); - let file_reader: Arc = - Arc::new(SerializedFileReader::new(file).unwrap()); + let file_reader: Arc = Arc::new(SerializedFileReader::new(file).unwrap()); let file_metadata = file_reader.metadata().file_metadata(); let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); @@ -359,8 +356,7 @@ mod tests { ) .unwrap(); - let array_reader = - build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap(); + let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap(); // Create arrow types let arrow_type = DataType::Struct(Fields::from(vec![Field::new( diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs new file mode 100644 index 000000000000..972fbdb20552 --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -0,0 +1,638 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow_array::ArrayRef; +use arrow_schema::DataType as ArrowType; +use bytes::Bytes; +use std::any::Any; + +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteViewArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + record_reader: GenericRecordReader, +} + +impl ByteViewArrayReader { + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteViewArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self) -> Result { + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + + let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); + + Ok(array) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteViewArrayColumnValueDecoder { + dict: Option, + decoder: Option, + validate_utf8: bool, +} + +impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { + type Buffer = ViewBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: Bytes, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = ViewBuffer::default(); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + self.decoder = Some(ByteViewArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, num_values, self.dict.as_ref()) + } + + fn skip_values(&mut self, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) + } +} + +/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] +pub enum ByteViewArrayDecoder { + Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), + DeltaLength(ByteViewArrayDecoderDeltaLength), + DeltaByteArray(ByteViewArrayDecoderDelta), +} + +impl ByteViewArrayDecoder { + pub fn new( + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Result { + let decoder = match encoding { + Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( + ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( + ByteViewArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut ViewBuffer, + len: usize, + dict: Option<&ViewBuffer>, + ) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.read(out, dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } + + /// Skip `len` values + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderPlain { + buf: Bytes, + offset: usize, + validate_utf8: bool, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteViewArrayDecoderPlain { + pub fn new( + buf: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Self { + Self { + buf, + validate_utf8, + offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.max_remaining_values); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + + self.offset = end_offset; + read += 1; + } + self.max_remaining_values -= to_read; + + Ok(to_read) + } + + pub fn skip(&mut self, to_skip: usize) -> Result { + let to_skip = to_skip.min(self.max_remaining_values); + let mut skip = 0; + let buf = self.buf.as_ref(); + + while self.offset < self.buf.len() && skip != to_skip { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + skip += 1; + self.offset = self.offset + 4 + len; + } + self.max_remaining_values -= skip; + Ok(skip) + } +} + +/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDeltaLength { + lengths: Vec, + data: Bytes, + length_offset: usize, + data_offset: usize, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDeltaLength { + fn new(data: Bytes, validate_utf8: bool) -> Result { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.clone(), 0)?; + let values = len_decoder.values_left(); + + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + validate_utf8, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.lengths.len() - self.length_offset); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; + + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + if self.data_offset + total_bytes > self.data.len() { + return Err(ParquetError::EOF( + "Insufficient delta length byte array bytes".to_string(), + )); + } + + let mut start_offset = self.data_offset; + for length in src_lengths { + let end_offset = start_offset + *length as usize; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; + start_offset = end_offset; + } + + self.data_offset = start_offset; + self.length_offset += to_read; + + Ok(to_read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + let remain_values = self.lengths.len() - self.length_offset; + let to_skip = remain_values.min(to_skip); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + self.data_offset += total_bytes; + self.length_offset += to_skip; + Ok(to_skip) + } +} + +/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDelta { + decoder: DeltaByteArrayDecoder, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDelta { + fn new(data: Bytes, validate_utf8: bool) -> Result { + Ok(Self { + decoder: DeltaByteArrayDecoder::new(data)?, + validate_utf8, + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let read = self + .decoder + .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; + + Ok(read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + self.decoder.skip(to_skip) + } +} + +/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDictionary { + decoder: DictIndexDecoder, +} + +impl ByteViewArrayDecoderDictionary { + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values), + } + } + + fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.read(len, |keys| { + output.extend_from_dictionary(keys, &dict.values) + }) + } + + fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.skip(to_skip) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use arrow_array::{Array, StringViewArray}; + use arrow_buffer::Buffer; + + #[test] + fn test_byte_array_decoder() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!( + output.values.first().unwrap().as_ref().unwrap(), + "hello".as_bytes() + ); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("a"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!( + output.values.first().unwrap().as_ref().unwrap(), + "hello".as_bytes() + ); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!( + output.values.get(1).unwrap().as_ref().unwrap(), + "large payload over 12 bytes".as_bytes() + ); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_nulls() { + let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + // test nulls read + for (encoding, page) in pages.clone() { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); + } + + // test nulls skip + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.skip_values(1024).unwrap(), 0); + } + } +} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 4ae0f5669e87..8a65aa96a296 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -41,6 +41,7 @@ mod null_array; mod primitive_array; mod struct_array; +mod byte_view_array; #[cfg(test)] mod test_util; diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 61933b24178e..07f23f5b17c6 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -27,8 +27,8 @@ use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; use arrow_array::{ - Array, ArrayAccessor, BinaryArray, DictionaryArray, LargeBinaryArray, LargeStringArray, - StringArray, + Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, LargeBinaryArray, + LargeStringArray, StringArray, StringViewArray, }; use arrow_schema::DataType; @@ -66,12 +66,16 @@ macro_rules! downcast_op { DataType::LargeUtf8 => { $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*) } + DataType::Utf8View => $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*), DataType::Binary => { $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*) } DataType::LargeBinary => { $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*) } + DataType::BinaryView => { + $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*) + } DataType::Dictionary(key, value) => match value.as_ref() { DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*), DataType::LargeUtf8 => { diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 955896010b85..c50e612b2473 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -75,6 +75,7 @@ fn is_leaf(data_type: &DataType) -> bool { | DataType::Float32 | DataType::Float64 | DataType::Utf8 + | DataType::Utf8View | DataType::LargeUtf8 | DataType::Timestamp(_, _) | DataType::Date32 @@ -85,6 +86,7 @@ fn is_leaf(data_type: &DataType) -> bool { | DataType::Interval(_) | DataType::Binary | DataType::LargeBinary + | DataType::BinaryView | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) | DataType::FixedSizeBinary(_) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 7206e6a6907d..d42db0d9cbc1 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -696,7 +696,9 @@ fn get_arrow_column_writer( ArrowDataType::LargeBinary | ArrowDataType::Binary | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 => { + | ArrowDataType::LargeUtf8 + | ArrowDataType::BinaryView + | ArrowDataType::Utf8View => { out.push(bytes(leaves.next().unwrap())) } ArrowDataType::List(f) @@ -720,6 +722,9 @@ fn get_arrow_column_writer( ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { out.push(bytes(leaves.next().unwrap())) } + ArrowDataType::Utf8View | ArrowDataType::BinaryView => { + out.push(bytes(leaves.next().unwrap())) + } _ => { out.push(col(leaves.next().unwrap())) } @@ -1222,6 +1227,31 @@ mod tests { roundtrip(batch, Some(SMALL_SIZE / 2)); } + #[test] + fn arrow_writer_binary_view() { + let string_field = Field::new("a", DataType::Utf8View, false); + let binary_field = Field::new("b", DataType::BinaryView, false); + let schema = Schema::new(vec![string_field, binary_field]); + + let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"]; + let raw_binary_values = vec![ + b"foo".to_vec(), + b"bar".to_vec(), + b"large payload over 12 bytes".to_vec(), + b"lulu".to_vec(), + ]; + + let string_view_values = StringViewArray::from(raw_string_values); + let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(string_view_values), Arc::new(binary_view_values)], + ) + .unwrap(); + + roundtrip(batch, Some(SMALL_SIZE / 2)); + } + fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch { let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false); let schema = Schema::new(vec![decimal_field]); diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index cbc795d94f57..e7566d9a505f 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,3 +20,4 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; +pub mod view_buffer; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs new file mode 100644 index 000000000000..635cd8b4c35a --- /dev/null +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::builder::GenericByteViewBuilder; +use arrow_array::types::BinaryViewType; +use arrow_array::ArrayRef; +use arrow_buffer::{ArrowNativeType, Buffer}; +use arrow_schema::DataType as ArrowType; +use std::sync::Arc; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub values: Vec>>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.values.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + self.values.push(Some(data.to_vec())); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary( + &mut self, + keys: &[K], + dict: &Vec>>, + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index + 1 > dict.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict.len() + )); + } + + let value = dict.get(index).unwrap(); + + // Dictionary values are verified when decoding dictionary page + self.try_push(value.as_ref().unwrap(), false)?; + } + Ok(()) + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array(self, _null_buffer: Option, data_type: ArrowType) -> ArrayRef { + let mut builder = + GenericByteViewBuilder::::with_capacity(self.values.len()); + self.values + .into_iter() + .for_each(|v| builder.append_option(v)); + + match data_type { + ArrowType::BinaryView => Arc::new(builder.finish()), + ArrowType::Utf8View => Arc::new(builder.finish().to_stringview().unwrap()), + _ => unreachable!(), + } + } +} + +impl ValuesBuffer for ViewBuffer { + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + self.values.resize(read_offset + levels_read, None); + + let values_range = read_offset..read_offset + values_read; + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + self.values[level_pos] = self.values[value_pos].take(); + } + } +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 300a21c4f133..e50016fd5b10 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -464,9 +464,10 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } + DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .build(), DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal @@ -499,6 +500,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_id(id) .build() } + DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .with_repetition(repetition) + .with_id(id) + .build(), DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { Type::group_type_builder(name) .with_fields(vec![Arc::new( diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index fdc744831a25..17dd7862f3dc 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::basic::{ - ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType, -}; +use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType}; use crate::errors::{ParquetError, Result}; use crate::schema::types::{BasicTypeInfo, Type}; use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION}; @@ -59,6 +57,10 @@ fn apply_hint(parquet: DataType, hint: DataType) -> DataType { (DataType::Utf8, DataType::LargeUtf8) => hint, (DataType::Binary, DataType::LargeBinary) => hint, + // Determine view type + (DataType::Utf8, DataType::Utf8View) => hint, + (DataType::Binary, DataType::BinaryView) => hint, + // Determine interval time unit (#1666) (DataType::Interval(_), DataType::Interval(_)) => hint, @@ -158,9 +160,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::UInt32), _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)), }, - (Some(LogicalType::Decimal { scale, precision }), _) => { - decimal_128_type(scale, precision) - } + (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision), (Some(LogicalType::Date), _) => Ok(DataType::Date32), (Some(LogicalType::Time { unit, .. }), _) => match unit { ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), @@ -237,9 +237,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result { - decimal_128_type(scale, precision) - } + (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision), (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision), (logical, converted) => Err(arrow_err!( "Unable to convert parquet INT64 logical type {:?} or converted type {}",