-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(datafusion): Expose DataFusion statistics on an IcebergTableScan #880
Open
gruuya
wants to merge
5
commits into
apache:main
Choose a base branch
from
gruuya:datafusion-statistics
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+287
−27
Open
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
80b8d8c
Expose DataFusion statistics on an IcebergTableScan
gruuya bae11ea
Default to unknown statistics upon encountering an error
gruuya 1738308
SKip positional/equality delete files when computing stats
gruuya 98bdd8a
Move statistics computation to IcebergTableProvider constructor methods
gruuya 979d07a
Make statistics computation opt-in for IcebergTableProvider
gruuya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use datafusion::catalog::TableProvider; | ||
use datafusion::common::stats::Precision; | ||
use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; | ||
use iceberg::{Catalog, Result, TableIdent}; | ||
use iceberg_datafusion::IcebergTableProvider; | ||
use iceberg_integration_tests::set_test_fixture; | ||
|
||
#[tokio::test] | ||
async fn test_statistics() -> Result<()> { | ||
let fixture = set_test_fixture("datafusion_statistics").await; | ||
|
||
let catalog = fixture.rest_catalog; | ||
|
||
let table = catalog | ||
.load_table( | ||
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) | ||
.unwrap(), | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
let stats = IcebergTableProvider::try_new_from_table(table) | ||
.await? | ||
.statistics(); | ||
|
||
assert_eq!( | ||
stats, | ||
Some(Statistics { | ||
num_rows: Precision::Inexact(12), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: vec![ | ||
ColumnStatistics { | ||
null_count: Precision::Inexact(0), | ||
max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), | ||
min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), | ||
distinct_count: Precision::Absent, | ||
}, | ||
ColumnStatistics { | ||
null_count: Precision::Inexact(0), | ||
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), | ||
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), | ||
distinct_count: Precision::Absent, | ||
}, | ||
ColumnStatistics { | ||
null_count: Precision::Inexact(0), | ||
max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), | ||
min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), | ||
distinct_count: Precision::Absent, | ||
}, | ||
], | ||
}) | ||
); | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::collections::HashMap; | ||
|
||
use datafusion::common::stats::Precision; | ||
use datafusion::common::{ColumnStatistics, Statistics}; | ||
use iceberg::spec::{DataContentType, ManifestStatus}; | ||
use iceberg::table::Table; | ||
use iceberg::Result; | ||
|
||
use crate::physical_plan::expr_to_predicate::datum_to_scalar_value; | ||
|
||
// Compute DataFusion table statistics for a given table/snapshot | ||
pub async fn compute_statistics(table: &Table, snapshot_id: Option<i64>) -> Result<Statistics> { | ||
let file_io = table.file_io(); | ||
let metadata = table.metadata(); | ||
let snapshot = table.snapshot(snapshot_id)?; | ||
|
||
let mut num_rows = 0; | ||
let mut lower_bounds = HashMap::new(); | ||
let mut upper_bounds = HashMap::new(); | ||
let mut null_counts = HashMap::new(); | ||
|
||
let manifest_list = snapshot.load_manifest_list(file_io, metadata).await?; | ||
|
||
// For each existing/added manifest in the snapshot aggregate the row count, as well as null | ||
// count and min/max values. | ||
for manifest_file in manifest_list.entries() { | ||
let manifest = manifest_file.load_manifest(file_io).await?; | ||
manifest.entries().iter().for_each(|manifest_entry| { | ||
// Gather stats only for non-deleted data files | ||
if manifest_entry.status() != ManifestStatus::Deleted { | ||
let data_file = manifest_entry.data_file(); | ||
if data_file.content_type() == DataContentType::Data { | ||
num_rows += data_file.record_count(); | ||
data_file.lower_bounds().iter().for_each(|(col_id, min)| { | ||
lower_bounds | ||
.entry(*col_id) | ||
.and_modify(|col_min| { | ||
if min < col_min { | ||
*col_min = min.clone() | ||
} | ||
}) | ||
.or_insert(min.clone()); | ||
}); | ||
data_file.upper_bounds().iter().for_each(|(col_id, max)| { | ||
upper_bounds | ||
.entry(*col_id) | ||
.and_modify(|col_max| { | ||
if max > col_max { | ||
*col_max = max.clone() | ||
} | ||
}) | ||
.or_insert(max.clone()); | ||
}); | ||
data_file | ||
.null_value_counts() | ||
.iter() | ||
.for_each(|(col_id, null_count)| { | ||
null_counts | ||
.entry(*col_id) | ||
.and_modify(|col_null_count| *col_null_count += *null_count) | ||
.or_insert(*null_count); | ||
}); | ||
} | ||
} | ||
}) | ||
} | ||
|
||
// Construct the DataFusion `Statistics` object, leaving any missing info as `Precision::Absent` | ||
let schema = snapshot.schema(metadata)?; | ||
let col_stats = schema | ||
.as_struct() | ||
.fields() | ||
.iter() | ||
.map(|field| { | ||
ColumnStatistics { | ||
null_count: null_counts | ||
.get(&field.id) | ||
.map(|nc| Precision::Inexact(*nc as usize)) | ||
.unwrap_or(Precision::Absent), | ||
max_value: upper_bounds | ||
.get(&field.id) | ||
.and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) | ||
.unwrap_or(Precision::Absent), | ||
min_value: lower_bounds | ||
.get(&field.id) | ||
.and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) | ||
.unwrap_or(Precision::Absent), | ||
distinct_count: Precision::Absent, // will be picked up after #417 | ||
} | ||
}) | ||
.collect(); | ||
|
||
Ok(Statistics { | ||
num_rows: Precision::Inexact(num_rows as usize), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: col_stats, | ||
}) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two problems with this approach:
Also iceberg has table level statistics: https://iceberg.apache.org/spec/#table-statistics But currently it only contains ndv for each column. Should we consider reading this table statistics?
cc @Fokko @Xuanwo @sdd what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, I greatly appreciate it.
Regarding 1, I agree completely (and think it should be done during table instantiation), I mainly wanted to get some validation on the general approach first. (Perhaps it might also be an optional call via something like
IcebergTableProvider::with_statistics
, which would be chained after one of the existing construction methods.)As for point 2, is it not sufficient that I aggregate stats only for
manifest_entry.status() != ManifestStatus::Deleted
below? Put another way is it possible forManifestStatus::Existing | ManifestStatus::Added
entries to contain some misleading stats?Finally, while I think exposing the spec (puffin) statistics should definitely be implemented, it seems that this is not always available (it may be opt-in for some external writers such as pyiceberg/spark?), so the best course of action for starters is to gather the stats from the manifest (entries) by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what @liurenjie1024 means is the delete file.🤔 https://iceberg.apache.org/spec/#delete-formats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good point; I've omitted the equality/position delete files from stats computation process, so only pure data files are included now.
I assume that data files with associated row-level deletes don't get lower-upper bounds updated during commit time. I think that's ok, it just means that the real lower-upper range is strictly contained/nested within the reported lower-upper range (because the rows with actual min/max might be refuted by row level deletes), so the later is more conservative. This information is conveyed by using the
Inexact
variant for the bounds. (Perhaps this could be madeExact
for the row count by subtracting rows from row-deletion files, but I think that could be done separately.)I've also moved the statistics computation into IcebergTableProvider constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't get why it's not updated. If a row level deletion deletes a row with least values, should this be updated?
For point 1, I agree that adding an option would be useful to enable it or not.
For point 2, there are many problems, not only about deletions.
For example, for vary length columns, the stored value maybe truncated, then it's not appropricate to use them to calculate upper/lower bounds.
Also deletion not only change lower/upper bounds, but also row count, nvd, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, so what I was getting at is that I'm not sure whether an existing data file's stats (upper/lower bounds) are updated when a row-level deletion occurs which targets that file. Ideally yes, but I guess that is not a given. In other words I'm wondering is there an equivalent to Deltas tight-wide bounds for Iceberg:
Good point, manifests with trimmed upper/lower bounds for potentially large (e.g. string) columns will be misleading, arguably we should just skip providing statistics on those for now.
True, though you can still get good estimates on those, and again it's more useful to provide those than not as hints to DataFusion.