Skip to content

Commit

Permalink
Fix MapArrayReader (#2484) (#1699) (#1561) (#2500)
Browse files Browse the repository at this point in the history
* Fix MapArrayReader (#2484) (#1699) (#1561)

* Fix comments

* Review feedback
  • Loading branch information
tustvold authored Aug 18, 2022
1 parent f3afdd2 commit c34f07f
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 130 deletions.
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ fn build_map_reader(
field.arrow_type.clone(),
field.def_level,
field.rep_level,
field.nullable,
)))
}

Expand Down
281 changes: 151 additions & 130 deletions parquet/src/arrow/array_reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,67 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError::ArrowError;
use crate::errors::{ParquetError, Result};
use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray};
use arrow::buffer::{Buffer, MutableBuffer};
use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
use crate::errors::Result;
use arrow::array::{Array, ArrayRef, MapArray};
use arrow::datatypes::DataType as ArrowType;
use arrow::datatypes::ToByteSlice;
use arrow::util::bit_util;
use std::any::Any;
use std::sync::Arc;

/// Implementation of a map array reader.
pub struct MapArrayReader {
key_reader: Box<dyn ArrayReader>,
value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
map_def_level: i16,
#[allow(unused)]
map_rep_level: i16,
reader: ListArrayReader<i32>,
}

impl MapArrayReader {
/// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and `nullable`
/// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
pub fn new(
key_reader: Box<dyn ArrayReader>,
value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
def_level: i16,
rep_level: i16,
nullable: bool,
) -> Self {
Self {
key_reader,
value_reader,
data_type,
// These are the wrong way round https://github.com/apache/arrow-rs/issues/1699
map_def_level: rep_level,
map_rep_level: def_level,
}
let struct_def_level = match nullable {
true => def_level + 2,
false => def_level + 1,
};
let struct_rep_level = rep_level + 1;

let element = match &data_type {
ArrowType::Map(element, _) => match element.data_type() {
ArrowType::Struct(fields) if fields.len() == 2 => {
// Parquet cannot represent nullability at this level (#1697)
// and so encountering nullability here indicates some manner
// of schema inconsistency / inference bug
assert!(!element.is_nullable(), "map struct cannot be nullable");
element
}
_ => unreachable!("expected struct with two fields"),
},
_ => unreachable!("expected map type"),
};

let struct_reader = StructArrayReader::new(
element.data_type().clone(),
vec![key_reader, value_reader],
struct_def_level,
struct_rep_level,
false,
);

let reader = ListArrayReader::new(
Box::new(struct_reader),
ArrowType::List(element.clone()),
def_level,
rep_level,
nullable,
);

Self { data_type, reader }
}
}

Expand All @@ -65,131 +89,128 @@ impl ArrayReader for MapArrayReader {
}

fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let key_len = self.key_reader.read_records(batch_size)?;
let value_len = self.value_reader.read_records(batch_size)?;
// Check that key and value have the same lengths
if key_len != value_len {
return Err(general_err!(
"Map key and value should have the same lengths."
));
}
Ok(key_len)
self.reader.read_records(batch_size)
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
let key_array = self.key_reader.consume_batch()?;
let value_array = self.value_reader.consume_batch()?;

// Check that key and value have the same lengths
let key_length = key_array.len();
if key_length != value_array.len() {
return Err(general_err!(
"Map key and value should have the same lengths."
));
}

let def_levels = self
.key_reader
.get_def_levels()
.ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?;
let rep_levels = self
.key_reader
.get_rep_levels()
.ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?;

if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == key_length)) {
return Err(ArrowError(
"Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
));
}

let entry_data_type = if let ArrowType::Map(field, _) = &self.data_type {
field.data_type().clone()
} else {
return Err(ArrowError("Expected a map arrow type".to_string()));
};

let entry_data = ArrayDataBuilder::new(entry_data_type)
.len(key_length)
.add_child_data(key_array.into_data())
.add_child_data(value_array.into_data());
let entry_data = unsafe { entry_data.build_unchecked() };

let entry_len = rep_levels.iter().filter(|level| **level == 0).count();

// first item in each list has rep_level = 0, subsequent items have rep_level = 1
let mut offsets: Vec<i32> = Vec::new();
let mut cur_offset = 0;
def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
if *r == 0 || d == &self.map_def_level {
offsets.push(cur_offset);
}
if d > &self.map_def_level {
cur_offset += 1;
}
});
offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
// TODO: A useful optimization is to use the null count to fill with
// 0 or null, to reduce individual bits set in a loop.
// To favour dense data, set every slot to true, then unset
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let null_slice = null_buf.as_slice_mut();
let mut list_index = 0;
for i in 0..rep_levels.len() {
// If the level is lower than empty, then the slot is null.
// When a list is non-nullable, its empty level = null level,
// so this automatically factors that in.
if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
// should be empty list
bit_util::unset_bit(null_slice, list_index);
}
if rep_levels[i] == 0 {
list_index += 1;
}
}
let value_offsets = Buffer::from(&offsets.to_byte_slice());

// Now we can build array data
let array_data = ArrayDataBuilder::new(self.data_type.clone())
.len(entry_len)
.add_buffer(value_offsets)
.null_bit_buffer(Some(null_buf.into()))
.add_child_data(entry_data);

let array_data = unsafe { array_data.build_unchecked() };

Ok(Arc::new(MapArray::from(array_data)))
// A MapArray is just a ListArray with a StructArray child
// we can therefore just alter the ArrayData
let array = self.reader.consume_batch().unwrap();
let data = array.data().clone();
let builder = data.into_builder().data_type(self.data_type.clone());

// SAFETY - we can assume that ListArrayReader produces valid ListArray
// of the expected type, and as such its output can be reinterpreted as
// a MapArray without validation
Ok(Arc::new(MapArray::from(unsafe {
builder.build_unchecked()
})))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let key_skipped = self.key_reader.skip_records(num_records)?;
let value_skipped = self.value_reader.skip_records(num_records)?;
if key_skipped != value_skipped {
return Err(general_err!(
"MapArrayReader out of sync, skipped {} keys and {} values",
key_skipped,
value_skipped
));
}
Ok(key_skipped)
self.reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_def_levels()
self.reader.get_def_levels()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
// Children repetition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_rep_levels()
self.reader.get_rep_levels()
}
}

#[cfg(test)]
mod tests {
//TODO: Add unit tests (#1561)
use super::*;
use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::ArrowWriter;
use arrow::array;
use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow::datatypes::{Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;

#[test]
// This test writes a parquet file with the following data:
// +--------------------------------------------------------+
// |map |
// +--------------------------------------------------------+
// |null |
// |null |
// |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
// +--------------------------------------------------------+
//
// It then attempts to read the data back and checks that the third record
// contains the expected values.
fn read_map_array_column() {
// Schema for single map of string to int32
let schema = Schema::new(vec![Field::new(
"map",
ArrowType::Map(
Box::new(Field::new(
"entries",
ArrowType::Struct(vec![
Field::new("keys", ArrowType::Utf8, false),
Field::new("values", ArrowType::Int32, true),
]),
false,
)),
false, // Map field not sorted
),
true,
)]);

// Create builders for map
let string_builder = StringBuilder::new(5);
let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new(1);
let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);

// Add two null records and one record with five entries
map_builder.append(false).expect("adding null map entry");
map_builder.append(false).expect("adding null map entry");
map_builder.keys().append_value("three");
map_builder.keys().append_value("four");
map_builder.keys().append_value("five");
map_builder.keys().append_value("six");
map_builder.keys().append_value("seven");

map_builder.values().append_value(3);
map_builder.values().append_value(4);
map_builder.values().append_value(5);
map_builder.values().append_value(6);
map_builder.values().append_value(7);
map_builder.append(true).expect("adding map entry");

// Create record batch
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
.expect("create record batch");

// Write record batch to file
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None)
.expect("creat file writer");
writer.write(&batch).expect("writing file");
writer.close().expect("close writer");

// Read file
let reader = Bytes::from(buffer);
let record_batch_reader =
ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
for maybe_record_batch in record_batch_reader {
let record_batch = maybe_record_batch.expect("Getting current batch");
let col = record_batch.column(0);
assert!(col.is_null(0));
assert!(col.is_null(1));
let map_entry = array::as_map_array(col).value(2);
let struct_col = array::as_struct_array(&map_entry);
let key_col = array::as_string_array(struct_col.column(0)); // Key column
assert_eq!(key_col.value(0), "three");
assert_eq!(key_col.value(1), "four");
assert_eq!(key_col.value(2), "five");
assert_eq!(key_col.value(3), "six");
assert_eq!(key_col.value(4), "seven");
}
}
}

0 comments on commit c34f07f

Please sign in to comment.