From c34f07f7ffcd0dc665e1b42d915a35bfa80cdebb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 18 Aug 2022 18:45:40 +0100 Subject: [PATCH] Fix `MapArrayReader` (#2484) (#1699) (#1561) (#2500) * Fix MapArrayReader (#2484) (#1699) (#1561) * Fix comments * Review feedback --- parquet/src/arrow/array_reader/builder.rs | 1 + parquet/src/arrow/array_reader/map_array.rs | 281 +++++++++++--------- 2 files changed, 152 insertions(+), 130 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index e389158a1931..84e833ac45e1 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -88,6 +88,7 @@ fn build_map_reader( field.arrow_type.clone(), field.def_level, field.rep_level, + field.nullable, ))) } diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs index 3ba7f6960ec3..ad3d71c66e41 100644 --- a/parquet/src/arrow/array_reader/map_array.rs +++ b/parquet/src/arrow/array_reader/map_array.rs @@ -15,43 +15,67 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::ArrayReader; -use crate::errors::ParquetError::ArrowError; -use crate::errors::{ParquetError, Result}; -use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray}; -use arrow::buffer::{Buffer, MutableBuffer}; +use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader}; +use crate::errors::Result; +use arrow::array::{Array, ArrayRef, MapArray}; use arrow::datatypes::DataType as ArrowType; -use arrow::datatypes::ToByteSlice; -use arrow::util::bit_util; use std::any::Any; use std::sync::Arc; /// Implementation of a map array reader. pub struct MapArrayReader { - key_reader: Box, - value_reader: Box, data_type: ArrowType, - map_def_level: i16, - #[allow(unused)] - map_rep_level: i16, + reader: ListArrayReader, } impl MapArrayReader { + /// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and `nullable` + /// as defined on [`ParquetField`][crate::arrow::schema::ParquetField] pub fn new( key_reader: Box, value_reader: Box, data_type: ArrowType, def_level: i16, rep_level: i16, + nullable: bool, ) -> Self { - Self { - key_reader, - value_reader, - data_type, - // These are the wrong way round https://github.com/apache/arrow-rs/issues/1699 - map_def_level: rep_level, - map_rep_level: def_level, - } + let struct_def_level = match nullable { + true => def_level + 2, + false => def_level + 1, + }; + let struct_rep_level = rep_level + 1; + + let element = match &data_type { + ArrowType::Map(element, _) => match element.data_type() { + ArrowType::Struct(fields) if fields.len() == 2 => { + // Parquet cannot represent nullability at this level (#1697) + // and so encountering nullability here indicates some manner + // of schema inconsistency / inference bug + assert!(!element.is_nullable(), "map struct cannot be nullable"); + element + } + _ => unreachable!("expected struct with two fields"), + }, + _ => unreachable!("expected map type"), + }; + + let struct_reader = StructArrayReader::new( + element.data_type().clone(), + vec![key_reader, value_reader], + struct_def_level, + struct_rep_level, + false, + ); + + let reader = ListArrayReader::new( + Box::new(struct_reader), + ArrowType::List(element.clone()), + def_level, + rep_level, + nullable, + ); + + Self { data_type, reader } } } @@ -65,131 +89,128 @@ impl ArrayReader for MapArrayReader { } fn read_records(&mut self, batch_size: usize) -> Result { - let key_len = self.key_reader.read_records(batch_size)?; - let value_len = self.value_reader.read_records(batch_size)?; - // Check that key and value have the same lengths - if key_len != value_len { - return Err(general_err!( - "Map key and value should have the same lengths." - )); - } - Ok(key_len) + self.reader.read_records(batch_size) } fn consume_batch(&mut self) -> Result { - let key_array = self.key_reader.consume_batch()?; - let value_array = self.value_reader.consume_batch()?; - - // Check that key and value have the same lengths - let key_length = key_array.len(); - if key_length != value_array.len() { - return Err(general_err!( - "Map key and value should have the same lengths." - )); - } - - let def_levels = self - .key_reader - .get_def_levels() - .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; - let rep_levels = self - .key_reader - .get_rep_levels() - .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; - - if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == key_length)) { - return Err(ArrowError( - "Expected item_reader def_levels and rep_levels to be same length as batch".to_string(), - )); - } - - let entry_data_type = if let ArrowType::Map(field, _) = &self.data_type { - field.data_type().clone() - } else { - return Err(ArrowError("Expected a map arrow type".to_string())); - }; - - let entry_data = ArrayDataBuilder::new(entry_data_type) - .len(key_length) - .add_child_data(key_array.into_data()) - .add_child_data(value_array.into_data()); - let entry_data = unsafe { entry_data.build_unchecked() }; - - let entry_len = rep_levels.iter().filter(|level| **level == 0).count(); - - // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); - let mut cur_offset = 0; - def_levels.iter().zip(rep_levels).for_each(|(d, r)| { - if *r == 0 || d == &self.map_def_level { - offsets.push(cur_offset); - } - if d > &self.map_def_level { - cur_offset += 1; - } - }); - offsets.push(cur_offset); - - let num_bytes = bit_util::ceil(offsets.len(), 8); - // TODO: A useful optimization is to use the null count to fill with - // 0 or null, to reduce individual bits set in a loop. - // To favour dense data, set every slot to true, then unset - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.as_slice_mut(); - let mut list_index = 0; - for i in 0..rep_levels.len() { - // If the level is lower than empty, then the slot is null. - // When a list is non-nullable, its empty level = null level, - // so this automatically factors that in. - if rep_levels[i] == 0 && def_levels[i] < self.map_def_level { - // should be empty list - bit_util::unset_bit(null_slice, list_index); - } - if rep_levels[i] == 0 { - list_index += 1; - } - } - let value_offsets = Buffer::from(&offsets.to_byte_slice()); - - // Now we can build array data - let array_data = ArrayDataBuilder::new(self.data_type.clone()) - .len(entry_len) - .add_buffer(value_offsets) - .null_bit_buffer(Some(null_buf.into())) - .add_child_data(entry_data); - - let array_data = unsafe { array_data.build_unchecked() }; - - Ok(Arc::new(MapArray::from(array_data))) + // A MapArray is just a ListArray with a StructArray child + // we can therefore just alter the ArrayData + let array = self.reader.consume_batch().unwrap(); + let data = array.data().clone(); + let builder = data.into_builder().data_type(self.data_type.clone()); + + // SAFETY - we can assume that ListArrayReader produces valid ListArray + // of the expected type, and as such its output can be reinterpreted as + // a MapArray without validation + Ok(Arc::new(MapArray::from(unsafe { + builder.build_unchecked() + }))) } fn skip_records(&mut self, num_records: usize) -> Result { - let key_skipped = self.key_reader.skip_records(num_records)?; - let value_skipped = self.value_reader.skip_records(num_records)?; - if key_skipped != value_skipped { - return Err(general_err!( - "MapArrayReader out of sync, skipped {} keys and {} values", - key_skipped, - value_skipped - )); - } - Ok(key_skipped) + self.reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { - // Children definition levels should describe the same parent structure, - // so return key_reader only - self.key_reader.get_def_levels() + self.reader.get_def_levels() } fn get_rep_levels(&self) -> Option<&[i16]> { - // Children repetition levels should describe the same parent structure, - // so return key_reader only - self.key_reader.get_rep_levels() + self.reader.get_rep_levels() } } #[cfg(test)] mod tests { - //TODO: Add unit tests (#1561) + use super::*; + use crate::arrow::arrow_reader::ParquetRecordBatchReader; + use crate::arrow::ArrowWriter; + use arrow::array; + use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder}; + use arrow::datatypes::{Field, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; + use bytes::Bytes; + + #[test] + // This test writes a parquet file with the following data: + // +--------------------------------------------------------+ + // |map | + // +--------------------------------------------------------+ + // |null | + // |null | + // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}| + // +--------------------------------------------------------+ + // + // It then attempts to read the data back and checks that the third record + // contains the expected values. + fn read_map_array_column() { + // Schema for single map of string to int32 + let schema = Schema::new(vec![Field::new( + "map", + ArrowType::Map( + Box::new(Field::new( + "entries", + ArrowType::Struct(vec![ + Field::new("keys", ArrowType::Utf8, false), + Field::new("values", ArrowType::Int32, true), + ]), + false, + )), + false, // Map field not sorted + ), + true, + )]); + + // Create builders for map + let string_builder = StringBuilder::new(5); + let ints_builder: PrimitiveBuilder = PrimitiveBuilder::new(1); + let mut map_builder = MapBuilder::new(None, string_builder, ints_builder); + + // Add two null records and one record with five entries + map_builder.append(false).expect("adding null map entry"); + map_builder.append(false).expect("adding null map entry"); + map_builder.keys().append_value("three"); + map_builder.keys().append_value("four"); + map_builder.keys().append_value("five"); + map_builder.keys().append_value("six"); + map_builder.keys().append_value("seven"); + + map_builder.values().append_value(3); + map_builder.values().append_value(4); + map_builder.values().append_value(5); + map_builder.values().append_value(6); + map_builder.values().append_value(7); + map_builder.append(true).expect("adding map entry"); + + // Create record batch + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())]) + .expect("create record batch"); + + // Write record batch to file + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None) + .expect("creat file writer"); + writer.write(&batch).expect("writing file"); + writer.close().expect("close writer"); + + // Read file + let reader = Bytes::from(buffer); + let record_batch_reader = + ParquetRecordBatchReader::try_new(reader, 1024).unwrap(); + for maybe_record_batch in record_batch_reader { + let record_batch = maybe_record_batch.expect("Getting current batch"); + let col = record_batch.column(0); + assert!(col.is_null(0)); + assert!(col.is_null(1)); + let map_entry = array::as_map_array(col).value(2); + let struct_col = array::as_struct_array(&map_entry); + let key_col = array::as_string_array(struct_col.column(0)); // Key column + assert_eq!(key_col.value(0), "three"); + assert_eq!(key_col.value(1), "four"); + assert_eq!(key_col.value(2), "five"); + assert_eq!(key_col.value(3), "six"); + assert_eq!(key_col.value(4), "seven"); + } + } }