Skip to content

Commit

Permalink
check if column is only nulls
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jul 6, 2024
1 parent 93bdf79 commit 47780d6
Showing 1 changed file with 116 additions and 110 deletions.
226 changes: 116 additions & 110 deletions iceberg-rust/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ pub fn parquet_to_datafile(
.or_insert(row_group.num_rows());

if let Some(statistics) = column.statistics() {
let mut only_nulls = false;
null_value_counts
.entry(id)
.and_modify(|x| *x += statistics.null_count() as i64)
.or_insert(statistics.null_count() as i64);
if let Some(distinct_count) = statistics.distinct_count() {
if statistics.null_count() > 0 && distinct_count == 1 {
only_nulls = true
}
distinct_counts
.entry(id)
.and_modify(|x| *x += distinct_count as i64)
Expand All @@ -95,123 +99,125 @@ pub fn parquet_to_datafile(
.ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
.field_type;

if let Type::Primitive(_) = &data_type {
let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?;
match lower_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current > *new_val {
*entry = new
}
if !only_nulls {
if let Type::Primitive(_) = &data_type {
let new = Value::try_from_bytes(statistics.min_bytes(), data_type)?;
match lower_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current > *new_val {
*entry = new
}
}
_ => (),
}
(Value::Time(current), Value::Time(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current > *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current > *new_val {
*entry = new
}
}
_ => (),
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?;
match upper_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current < *new_val {
*entry = new
}
let new = Value::try_from_bytes(statistics.max_bytes(), data_type)?;
match upper_bounds.entry(id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
match (&entry, &new) {
(Value::Int(current), Value::Int(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::LongInt(current), Value::LongInt(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Float(current), Value::Float(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Double(current), Value::Double(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current < *new_val {
*entry = new
}
}
_ => (),
}
(Value::Double(current), Value::Double(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Date(current), Value::Date(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Time(current), Value::Time(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::Timestamp(current), Value::Timestamp(new_val)) => {
if *current < *new_val {
*entry = new
}
}
(Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
if *current < *new_val {
*entry = new
}
}
_ => (),
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}

if let Some(partition_value) = partition.get_mut(column_name) {
if partition_value.is_none() {
let transform = transforms
.get(column_name)
.ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
let min = Value::try_from_bytes(statistics.min_bytes(), data_type)?
.tranform(transform)?;
let max = Value::try_from_bytes(statistics.max_bytes(), data_type)?
.tranform(transform)?;
if min == max {
*partition_value = Some(min)
if let Some(partition_value) = partition.get_mut(column_name) {
if partition_value.is_none() {
let transform = transforms
.get(column_name)
.ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
let min = Value::try_from_bytes(statistics.min_bytes(), data_type)?
.tranform(transform)?;
let max = Value::try_from_bytes(statistics.max_bytes(), data_type)?
.tranform(transform)?;
if min == max {
*partition_value = Some(min)
}
}
}
}
Expand Down

0 comments on commit 47780d6

Please sign in to comment.