From 15b433c4883b3c037d278bcc6615f6a9374b3133 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 24 Dec 2024 19:45:23 +0800 Subject: [PATCH] fix parse var len of decimal for parquet statistic --- crates/iceberg/src/arrow/schema.rs | 27 ++- .../src/writer/file_writer/parquet_writer.rs | 188 +++++++++++++++++- 2 files changed, 213 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 91dfe85e9..c182ec079 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -680,6 +681,30 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { + if array.len() > 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "fail to extend array with len > 16 to array with 16", + )); + } + + // Check the sign bit: if the first byte's MSB is 1, it's negative + let is_negative = array.data().first().map_or(false, |&b| b & 0x80 != 0); + + // Create a buffer of 16 bytes filled with the sign extension value + let mut extended = if is_negative { + [0xFF; 16] // Fill with 0xFF for negative numbers + } else { + [0x00; 16] // Fill with 0x00 for positive numbers + }; + + let start = 16 - array.len(); + extended[start..].copy_from_slice(array.data()); + + Ok(extended) +} + macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -741,7 +766,7 @@ macro_rules! get_parquet_stat_as_datum { }; Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), )) } ( diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c..7710a45b7 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -478,15 +478,18 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray, + RecordBatch, StructArray, }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use rust_decimal::Decimal; use tempfile::TempDir; use uuid::Uuid; use super::*; + use crate::arrow::schema_to_arrow_schema; use crate::io::FileIOBuilder; use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; @@ -1169,4 +1172,187 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_decimal_bound() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // test 1.1 and 2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test -1.1 and -2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test max and min for scale 38 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: 0, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema, + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![ + Some(99999999999999999999999999999999999999_i128), + Some(-99999999999999999999999999999999999999_i128), + ]) + .with_data_type(DataType::Decimal128(38, 0)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 0 + }, + PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) + )) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 0 + }, + PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) + )) + .as_ref() + ); + + Ok(()) + } }