Skip to content

Commit

Permalink
filter out delete files for table statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jul 27, 2024
1 parent 0d5e8de commit 8966821
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions datafusion_iceberg/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use datafusion::{
physical_plan::{ColumnStatistics, Statistics},
scalar::ScalarValue,
};
use iceberg_rust::spec::{manifest::ManifestEntry, schema::Schema, values::Value};
use iceberg_rust::spec::{
manifest::{Content, ManifestEntry},
schema::Schema,
values::Value,
};
use iceberg_rust::{catalog::tabular::Tabular, table::Table};

use crate::error::Error;
Expand Down Expand Up @@ -35,43 +39,46 @@ pub(crate) async fn table_statistics(
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
let manifests = table.manifests(snapshot_range.0, snapshot_range.1).await?;
let datafiles = table.datafiles(&manifests, None).await?;
Ok(datafiles.iter().fold(
Statistics {
num_rows: Precision::Exact(0),
total_byte_size: Precision::Exact(0),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
distinct_count: Precision::Absent
};
schema.fields().len()
],
},
|acc, manifest| {
let column_stats = column_statistics(&schema, manifest);
Ok(datafiles
.iter()
.filter(|manifest| matches!(manifest.data_file().content(), Content::Data))
.fold(
Statistics {
num_rows: acc.num_rows.add(&Precision::Exact(
*manifest.data_file().record_count() as usize
)),
total_byte_size: acc.total_byte_size.add(&Precision::Exact(
*manifest.data_file().file_size_in_bytes() as usize,
)),
column_statistics: acc
.column_statistics
.into_iter()
.zip(column_stats)
.map(|(acc, x)| ColumnStatistics {
null_count: acc.null_count.add(&x.null_count),
max_value: acc.max_value.max(&x.max_value),
min_value: acc.min_value.min(&x.min_value),
distinct_count: acc.distinct_count.add(&x.distinct_count),
})
.collect(),
}
},
))
num_rows: Precision::Exact(0),
total_byte_size: Precision::Exact(0),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
distinct_count: Precision::Absent
};
schema.fields().len()
],
},
|acc, manifest| {
let column_stats = column_statistics(&schema, manifest);
Statistics {
num_rows: acc.num_rows.add(&Precision::Exact(
*manifest.data_file().record_count() as usize,
)),
total_byte_size: acc.total_byte_size.add(&Precision::Exact(
*manifest.data_file().file_size_in_bytes() as usize,
)),
column_statistics: acc
.column_statistics
.into_iter()
.zip(column_stats)
.map(|(acc, x)| ColumnStatistics {
null_count: acc.null_count.add(&x.null_count),
max_value: acc.max_value.max(&x.max_value),
min_value: acc.min_value.min(&x.min_value),
distinct_count: acc.distinct_count.add(&x.distinct_count),
})
.collect(),
}
},
))
}

fn column_statistics<'a>(
Expand Down

0 comments on commit 8966821

Please sign in to comment.