Skip to content

Commit

Permalink
check if parquet column has min/max set
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jul 6, 2024
1 parent 93bdf79 commit e4a11a9
Showing 1 changed file with 112 additions and 110 deletions.
222 changes: 112 additions & 110 deletions iceberg-rust/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,123 +95,125 @@ pub fn parquet_to_datafile(
.ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
.field_type;

if let Type::Primitive(_) = &data_type {
let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?;
match lower_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current > *new_val {
*entry = new
}
if statistics.has_min_max_set() {
if let Type::Primitive(_) = &data_type {
let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?;
match lower_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current > *new_val {
*entry = new
}
}
_ => (),
}
(Value::Double(current), Value::Double(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current > *new_val {
*entry = new
}
}
_ => (),
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?;
match upper_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current < *new_val {
*entry = new
}
let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?;
match upper_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current < *new_val {
*entry = new
}
}
_ => (),
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current < *new_val {
*entry = new
}
}
_ => (),
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}

if let Some(partition_value) = partition.get_mut(column_name) {
if partition_value.is_none() {
let transform = transforms
.get(column_name)
.ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
let min = Value::try_from_bytes(statistics.min_bytes(), data_type)?
.tranform(transform)?;
let max = Value::try_from_bytes(statistics.max_bytes(), data_type)?
.tranform(transform)?;
if min == max {
*partition_value = Some(min)
if let Some(partition_value) = partition.get_mut(column_name) {
if partition_value.is_none() {
let transform = transforms
.get(column_name)
.ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
let min = Value::try_from_bytes(statistics.min_bytes(), data_type)?
.tranform(transform)?;
let max = Value::try_from_bytes(statistics.max_bytes(), data_type)?
.tranform(transform)?;
if min == max {
*partition_value = Some(min)
}
}
}
}
Expand Down

0 comments on commit e4a11a9

Please sign in to comment.