Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datafusion): Expose DataFusion statistics on an IcebergTableScan #880

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

gruuya
Copy link
Contributor

@gruuya gruuya commented Jan 6, 2025

Closes #869.

Provide detailed statistics via DataFusion's ExecutionPlan::statistics for more efficient join planning.

The statistics is accumulated from the snapshot's manifests, and converted to the adequate DataFusion struct.

@gruuya gruuya force-pushed the datafusion-statistics branch from 5a30c42 to 80b8d8c Compare January 6, 2025 13:41
// For each existing/added manifest in the snapshot aggregate the row count, as well as null
// count and min/max values.
for manifest_file in manifest_list.entries() {
let manifest = manifest_file.load_manifest(file_io).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems with this approach:

  1. It maybe quite slow for large table
  2. The value is incorrect for table with deletions, which maybe quite different.

Also iceberg has table level statistics: https://iceberg.apache.org/spec/#table-statistics But currently it only contains ndv for each column. Should we consider reading this table statistics?

cc @Fokko @Xuanwo @sdd what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I greatly appreciate it.

Regarding 1, I agree completely (and think it should be done during table instantiation), I mainly wanted to get some validation on the general approach first. (Perhaps it might also be an optional call via something like IcebergTableProvider::with_statistics, which would be chained after one of the existing construction methods.)

As for point 2, is it not sufficient that I aggregate stats only for manifest_entry.status() != ManifestStatus::Deleted below? Put another way is it possible for ManifestStatus::Existing | ManifestStatus::Added entries to contain some misleading stats?

Finally, while I think exposing the spec (puffin) statistics should definitely be implemented, it seems that this is not always available (it may be opt-in for some external writers such as pyiceberg/spark?), so the best course of action for starters is to gather the stats from the manifest (entries) by default.

Copy link
Contributor

@ZENOTME ZENOTME Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for point 2, is it not sufficient that I aggregate stats only for manifest_entry.status() != ManifestStatus::Deleted below? Put another way is it possible for ManifestStatus::Existing | ManifestStatus::Added entries to contain some misleading stats?

I think what @liurenjie1024 means is the delete file.🤔 https://iceberg.apache.org/spec/#delete-formats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point; I've omitted the equality/position delete files from stats computation process, so only pure data files are included now.

I assume that data files with associated row-level deletes don't get lower-upper bounds updated during commit time. I think that's ok, it just means that the real lower-upper range is strictly contained/nested within the reported lower-upper range (because the rows with actual min/max might be refuted by row level deletes), so the later is more conservative. This information is conveyed by using the Inexact variant for the bounds. (Perhaps this could be made Exact for the row count by subtracting rows from row-deletion files, but I think that could be done separately.)

I've also moved the statistics computation into IcebergTableProvider constructors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that data files with associated row-level deletes don't get lower-upper bounds updated during commit time.

Sorry, I don't get why it's not updated. If a row level deletion deletes a row with least values, should this be updated?

For point 1, I agree that adding an option would be useful to enable it or not.

For point 2, there are many problems, not only about deletions.

For example, for vary length columns, the stored value maybe truncated, then it's not appropricate to use them to calculate upper/lower bounds.

Also deletion not only change lower/upper bounds, but also row count, nvd, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get why it's not updated. If a row level deletion deletes a row with least values, should this be updated?

Oh, so what I was getting at is that I'm not sure whether an existing data file's stats (upper/lower bounds) are updated when a row-level deletion occurs which targets that file. Ideally yes, but I guess that is not a given. In other words I'm wondering is there an equivalent to Deltas tight-wide bounds for Iceberg:

In the presence of Deletion Vectors the statistics may be somewhat outdated, i.e. not reflecting deleted rows yet. The flag stats.tightBounds indicates whether we have tight bounds (i.e. the min/maxValue exists1 in the valid state of the file) or wide bounds (i.e. the minValue is <= all valid values in the file, and the maxValue >= all valid values in the file). These upper/lower bounds are sufficient information for data skipping. Note, stats.tightBounds should be treated as true when it is not explicitly present in the statistics.


For example, for vary length columns, the stored value maybe truncated

Good point, manifests with trimmed upper/lower bounds for potentially large (e.g. string) columns will be misleading, arguably we should just skip providing statistics on those for now.

Also deletion not only change lower/upper bounds, but also row count, nvd, etc.

True, though you can still get good estimates on those, and again it's more useful to provide those than not as hints to DataFusion.

/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self {
let statistics = compute_statistics(&table, None).await.ok();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some people don't care about stats (e.g. they don't really do a lot of joins). I assume a friendlier API in that case would be making the stats computation opt-in via something like

--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -119,6 +119,12 @@ impl IcebergTableProvider {
             statistics,
         })
     }
+
+    pub async fn with_computed_statistics(mut self) -> Self {
+        let statistics = compute_statistics(&self.table, self.snapshot_id).await.ok();
+        self.statistics = statistics;
+        self
+    }
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why statistics matters for join, I think you are referring to join reordering algorithm in query optimizer? From my experience, complex table statistics doesn't help much in join reordering. For example, if the joined table has many filters, how would you estimate correct statistics after filtering. Histogram may help for single column filter, but not for complex filters. Also cardinality estimation in join doesn't work well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the statistics computation opt-in along the above lines now.

I think you are referring to join reordering algorithm in query optimizer?

Yes, that is correct.

From my experience, complex table statistics doesn't help much in join reordering.

I think there are cases where it can help significantly, see apache/datafusion#7949 and apache/datafusion#7950 for instance.

For example, if the joined table has many filters, how would you estimate correct statistics after filtering. Histogram may help for single column filter, but not for complex filters. Also cardinality estimation in join doesn't work well.

Yeah admittedly, the entire procedure is based on a number of heuristics, and can be quite guesstimatative in nature. Still I think there's considerable value to be extracted, even if only hints are provided; cc @alamb who knows a lot more about potential pitfalls and upsides here than me.

@gruuya gruuya force-pushed the datafusion-statistics branch from 4d73dab to 979d07a Compare January 9, 2025 08:02
Comment on lines +155 to +157
self.statistics
.clone()
.unwrap_or(Statistics::new_unknown(self.schema.as_ref())),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One major missing aspect is applying the filters here (and perhaps projections) to the base statistics to further reduce the range of upper-lower bounds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Expose Iceberg table statistics in DataFusion interface(s)
3 participants