From e4a11a9f3a7ef03bf7dbcba287cc03a47f796801 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sat, 6 Jul 2024 11:28:34 +0200 Subject: [PATCH] check if parquet column has min/max set --- iceberg-rust/src/file_format/parquet.rs | 222 ++++++++++++------------ 1 file changed, 112 insertions(+), 110 deletions(-) diff --git a/iceberg-rust/src/file_format/parquet.rs b/iceberg-rust/src/file_format/parquet.rs index 81b17ade..3a423e4b 100644 --- a/iceberg-rust/src/file_format/parquet.rs +++ b/iceberg-rust/src/file_format/parquet.rs @@ -95,123 +95,125 @@ pub fn parquet_to_datafile( .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))? .field_type; - if let Type::Primitive(_) = &data_type { - let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?; - match lower_bounds.entry(id) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - match (&entry, &new) { - (Value::Int(current), Value::Int(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::LongInt(current), Value::LongInt(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::Float(current), Value::Float(new_val)) => { - if *current > *new_val { - *entry = new - } + if statistics.has_min_max_set() { + if let Type::Primitive(_) = &data_type { + let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?; + match lower_bounds.entry(id) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + match (&entry, &new) { + (Value::Int(current), Value::Int(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::LongInt(current), Value::LongInt(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::Float(current), Value::Float(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::Double(current), Value::Double(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::Date(current), Value::Date(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::Time(current), Value::Time(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::Timestamp(current), Value::Timestamp(new_val)) => { + if *current > *new_val { + *entry = new + } + } + (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { + if *current > *new_val { + *entry = new + } + } + _ => (), } - (Value::Double(current), Value::Double(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::Date(current), Value::Date(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::Time(current), Value::Time(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::Timestamp(current), Value::Timestamp(new_val)) => { - if *current > *new_val { - *entry = new - } - } - (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { - if *current > *new_val { - *entry = new - } - } - _ => (), + } + Entry::Vacant(entry) => { + entry.insert(new); } } - Entry::Vacant(entry) => { - entry.insert(new); - } - } - let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?; - match upper_bounds.entry(id) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - match (&entry, &new) { - (Value::Int(current), Value::Int(new_val)) => { - if *current < *new_val { - *entry = new - } + let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?; + match upper_bounds.entry(id) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + match (&entry, &new) { + (Value::Int(current), Value::Int(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::LongInt(current), Value::LongInt(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::Float(current), Value::Float(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::Double(current), Value::Double(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::Date(current), Value::Date(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::Time(current), Value::Time(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::Timestamp(current), Value::Timestamp(new_val)) => { + if *current < *new_val { + *entry = new + } + } + (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { + if *current < *new_val { + *entry = new + } + } + _ => (), } - (Value::LongInt(current), Value::LongInt(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::Float(current), Value::Float(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::Double(current), Value::Double(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::Date(current), Value::Date(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::Time(current), Value::Time(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::Timestamp(current), Value::Timestamp(new_val)) => { - if *current < *new_val { - *entry = new - } - } - (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { - if *current < *new_val { - *entry = new - } - } - _ => (), + } + Entry::Vacant(entry) => { + entry.insert(new); } } - Entry::Vacant(entry) => { - entry.insert(new); - } - } - if let Some(partition_value) = partition.get_mut(column_name) { - if partition_value.is_none() { - let transform = transforms - .get(column_name) - .ok_or_else(|| Error::InvalidFormat("transform".to_string()))?; - let min = Value::try_from_bytes(statistics.min_bytes(), data_type)? - .tranform(transform)?; - let max = Value::try_from_bytes(statistics.max_bytes(), data_type)? - .tranform(transform)?; - if min == max { - *partition_value = Some(min) + if let Some(partition_value) = partition.get_mut(column_name) { + if partition_value.is_none() { + let transform = transforms + .get(column_name) + .ok_or_else(|| Error::InvalidFormat("transform".to_string()))?; + let min = Value::try_from_bytes(statistics.min_bytes(), data_type)? + .tranform(transform)?; + let max = Value::try_from_bytes(statistics.max_bytes(), data_type)? + .tranform(transform)?; + if min == max { + *partition_value = Some(min) + } } } }