Skip to content

Commit

Permalink
support full u32 and u64 roundtrip through parquet (#258)
Browse files Browse the repository at this point in the history
* re-export arity kernels in `arrow::compute`

Seems logical since all other kernels are re-exported as well under this
flat hierarchy.

* return file from `parquet::arrow::arrow_writer::tests::[one_column]_roundtrip`

* support full arrow u64 through parquet

- updates arrow to parquet type mapping to use reinterpret/overflow cast
  for u64<->i64 similar to what the C++ stack does
- changes statistics calculation to account for the fact that u64 should
  be compared unsigned (as per spec)

Fixes #254.

* avoid copying array when reading u64 from parquet

* support full arrow u32 through parquet

This is idential to the solution we now have for u64.
  • Loading branch information
crepererum authored May 10, 2021
1 parent 8bd769b commit 2f5f58a
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 38 deletions.
1 change: 1 addition & 0 deletions arrow/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod util;

pub use self::kernels::aggregate::*;
pub use self::kernels::arithmetic::*;
pub use self::kernels::arity::*;
pub use self::kernels::boolean::*;
pub use self::kernels::cast::*;
pub use self::kernels::comparison::*;
Expand Down
30 changes: 24 additions & 6 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,29 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
}

let target_type = self.get_data_type().clone();
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE,
PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE,
PhysicalType::INT32 => {
match target_type {
ArrowType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
// `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
ArrowUInt32Type::DATA_TYPE
}
_ => ArrowInt32Type::DATA_TYPE,
}
}
PhysicalType::INT64 => {
match target_type {
ArrowType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
// `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
ArrowUInt64Type::DATA_TYPE
}
_ => ArrowInt64Type::DATA_TYPE,
}
}
PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE,
PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE,
PhysicalType::INT96
Expand Down Expand Up @@ -343,15 +362,14 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
let target_type = self.get_data_type();
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
arrow::compute::cast(&a, target_type)?
arrow::compute::cast(&a, &target_type)?
}
ArrowType::Decimal(p, s) => {
let mut builder = DecimalBuilder::new(array.len(), *p, *s);
let mut builder = DecimalBuilder::new(array.len(), p, s);
match array.data_type() {
ArrowType::Int32 => {
let values = array.as_any().downcast_ref::<Int32Array>().unwrap();
Expand Down Expand Up @@ -380,7 +398,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
Arc::new(builder.finish()) as ArrayRef
}
_ => arrow::compute::cast(&array, target_type)?,
_ => arrow::compute::cast(&array, &target_type)?,
};

// save definition and repetition buffers
Expand Down
141 changes: 124 additions & 17 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,45 @@ fn write_leaf(
let indices = levels.filter_array_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
let values = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
ArrowDataType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.expect("Unable to get u32 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
array,
|x| x as i32,
);
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
typed.write_batch(
get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
Expand All @@ -248,6 +274,19 @@ fn write_leaf(
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("Unable to get u64 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
array,
|x| x as i64,
);
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
Expand Down Expand Up @@ -498,16 +537,20 @@ fn get_fsb_array_slice(
mod tests {
use super::*;

use std::io::Seek;
use std::sync::Arc;
use std::{fs::File, io::Seek};

use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, buffer::Buffer};

use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor};
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
writer::InMemoryWriteableCursor,
};
use crate::util::test_common::get_temp_file;

#[test]
Expand Down Expand Up @@ -956,7 +999,7 @@ mod tests {

const SMALL_SIZE: usize = 4;

fn roundtrip(filename: &str, expected_batch: RecordBatch) {
fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
let file = get_temp_file(filename, &[]);

let mut writer = ArrowWriter::try_new(
Expand All @@ -968,7 +1011,7 @@ mod tests {
writer.write(&expected_batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

Expand All @@ -986,9 +1029,11 @@ mod tests {

assert_eq!(expected_data, actual_data);
}

file
}

fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
let schema = Schema::new(vec![Field::new(
"col",
values.data_type().clone(),
Expand All @@ -997,7 +1042,7 @@ mod tests {
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();

roundtrip(filename, expected_batch);
roundtrip(filename, expected_batch)
}

fn values_required<A, I>(iter: I, filename: &str)
Expand Down Expand Up @@ -1449,4 +1494,66 @@ mod tests {
expected_batch,
);
}

#[test]
fn u32_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt32Array::from_iter_values(vec![
u32::MIN,
u32::MIN + 1,
(i32::MAX as u32) - 1,
i32::MAX as u32,
(i32::MAX as u32) + 1,
u32::MAX - 1,
u32::MAX,
]));
let file = one_column_roundtrip("u32_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(*stats.min() as u32, u32::MIN);
assert_eq!(*stats.max() as u32, u32::MAX);
} else {
panic!("Statistics::Int32 missing")
}
}

#[test]
fn u64_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt64Array::from_iter_values(vec![
u64::MIN,
u64::MIN + 1,
(i64::MAX as u64) - 1,
i64::MAX as u64,
(i64::MAX as u64) + 1,
u64::MAX - 1,
u64::MAX,
]));
let file = one_column_roundtrip("u64_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int64(stats) = stats {
assert_eq!(*stats.min() as u64, u64::MIN);
assert_eq!(*stats.max() as u64, u64::MAX);
} else {
panic!("Statistics::Int64 missing")
}
}
}
59 changes: 44 additions & 15 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};

use crate::basic::{Compression, Encoding, PageType, Type};
use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::AsBytes;
use crate::data_type::*;
use crate::encodings::{
Expand Down Expand Up @@ -300,10 +301,18 @@ impl<T: DataType> ColumnWriterImpl<T> {
// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
if self.min_column_value.as_ref().map_or(true, |v| v > min) {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
}
if self.max_column_value.as_ref().map_or(true, |v| v < max) {
if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
}
}
Expand Down Expand Up @@ -925,31 +934,51 @@ impl<T: DataType> ColumnWriterImpl<T> {
fn update_page_min_max(&mut self, val: &T::T) {
// simple "isNaN" check that works for all types
if val == val {
if self.min_page_value.as_ref().map_or(true, |min| min > val) {
if self
.min_page_value
.as_ref()
.map_or(true, |min| self.compare_greater(min, val))
{
self.min_page_value = Some(val.clone());
}
if self.max_page_value.as_ref().map_or(true, |max| max < val) {
if self
.max_page_value
.as_ref()
.map_or(true, |max| self.compare_greater(val, max))
{
self.max_page_value = Some(val.clone());
}
}
}

fn update_column_min_max(&mut self) {
if self
.min_column_value
.as_ref()
.map_or(true, |min| min > self.min_page_value.as_ref().unwrap())
{
let update_min = self.min_column_value.as_ref().map_or(true, |min| {
let page_value = self.min_page_value.as_ref().unwrap();
self.compare_greater(min, page_value)
});
if update_min {
self.min_column_value = self.min_page_value.clone();
}
if self
.max_column_value
.as_ref()
.map_or(true, |max| max < self.max_page_value.as_ref().unwrap())
{

let update_max = self.max_column_value.as_ref().map_or(true, |max| {
let page_value = self.max_page_value.as_ref().unwrap();
self.compare_greater(page_value, max)
});
if update_max {
self.max_column_value = self.max_page_value.clone();
}
}

/// Evaluate `a > b` according to underlying logical type.
fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() {
if !int_type.is_signed {
// need to compare unsigned
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
a > b
}
}

// ----------------------------------------------------------------------
Expand Down

0 comments on commit 2f5f58a

Please sign in to comment.