Skip to content

Commit

Permalink
fix parse var len of decimal for parquet statistic
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 24, 2024
1 parent f33628e commit 15b433c
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 2 deletions.
27 changes: 26 additions & 1 deletion crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::data_type::ByteArray;
use parquet::file::statistics::Statistics;
use rust_decimal::prelude::ToPrimitive;
use uuid::Uuid;
Expand Down Expand Up @@ -680,6 +681,30 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
}
}

fn extend_to_i128_big_endian(array: ByteArray) -> Result<[u8; 16]> {
if array.len() > 16 {
return Err(Error::new(
ErrorKind::DataInvalid,
"fail to extend array with len > 16 to array with 16",
));
}

// Check the sign bit: if the first byte's MSB is 1, it's negative
let is_negative = array.data().first().map_or(false, |&b| b & 0x80 != 0);

// Create a buffer of 16 bytes filled with the sign extension value
let mut extended = if is_negative {
[0xFF; 16] // Fill with 0xFF for negative numbers
} else {
[0x00; 16] // Fill with 0x00 for positive numbers
};

let start = 16 - array.len();
extended[start..].copy_from_slice(array.data());

Ok(extended)
}

macro_rules! get_parquet_stat_as_datum {
($limit_type:tt) => {
paste::paste! {
Expand Down Expand Up @@ -741,7 +766,7 @@ macro_rules! get_parquet_stat_as_datum {
};
Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)),
))
}
(
Expand Down
188 changes: 187 additions & 1 deletion crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,18 @@ mod tests {
use anyhow::Result;
use arrow_array::types::Int64Type;
use arrow_array::{
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray,
Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray,
RecordBatch, StructArray,
};
use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef};
use arrow_select::concat::concat_batches;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use rust_decimal::Decimal;
use tempfile::TempDir;
use uuid::Uuid;

use super::*;
use crate::arrow::schema_to_arrow_schema;
use crate::io::FileIOBuilder;
use crate::spec::{PrimitiveLiteral, Struct, *};
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
Expand Down Expand Up @@ -1169,4 +1172,187 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_decimal_bound() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let loccation_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// test 1.1 and 2.2
let schema = Arc::new(
Schema::builder()
.with_fields(vec![NestedField::optional(
0,
"decimal",
Type::Primitive(PrimitiveType::Decimal {
precision: 28,
scale: 10,
}),
)
.into()])
.build()
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema.clone(),
file_io.clone(),
loccation_gen.clone(),
file_name_gen.clone(),
)
.build()
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
let data_file = res
.into_iter()
.next()
.unwrap()
.content(crate::spec::DataContentType::Data)
.partition(Struct::empty())
.build()
.unwrap();
assert_eq!(
data_file.upper_bounds().get(&0),
Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
.as_ref()
);
assert_eq!(
data_file.lower_bounds().get(&0),
Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
.as_ref()
);

// test -1.1 and -2.2
let schema = Arc::new(
Schema::builder()
.with_fields(vec![NestedField::optional(
0,
"decimal",
Type::Primitive(PrimitiveType::Decimal {
precision: 28,
scale: 10,
}),
)
.into()])
.build()
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema.clone(),
file_io.clone(),
loccation_gen.clone(),
file_name_gen.clone(),
)
.build()
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
let data_file = res
.into_iter()
.next()
.unwrap()
.content(crate::spec::DataContentType::Data)
.partition(Struct::empty())
.build()
.unwrap();
assert_eq!(
data_file.upper_bounds().get(&0),
Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
.as_ref()
);
assert_eq!(
data_file.lower_bounds().get(&0),
Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
.as_ref()
);

// test max and min for scale 38
let schema = Arc::new(
Schema::builder()
.with_fields(vec![NestedField::optional(
0,
"decimal",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 0,
}),
)
.into()])
.build()
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema,
file_io.clone(),
loccation_gen,
file_name_gen,
)
.build()
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![
Some(99999999999999999999999999999999999999_i128),
Some(-99999999999999999999999999999999999999_i128),
])
.with_data_type(DataType::Decimal128(38, 0)),
) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
let data_file = res
.into_iter()
.next()
.unwrap()
.content(crate::spec::DataContentType::Data)
.partition(Struct::empty())
.build()
.unwrap();
assert_eq!(
data_file.upper_bounds().get(&0),
Some(Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 0
},
PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
))
.as_ref()
);
assert_eq!(
data_file.lower_bounds().get(&0),
Some(Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 0
},
PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
))
.as_ref()
);

Ok(())
}
}

0 comments on commit 15b433c

Please sign in to comment.