diff --git a/Cargo.lock b/Cargo.lock index cf89eb512..13e9e388a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3086,6 +3086,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-memory", + "log", "tempfile", "tokio", ] diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7e05da59a..c4fc772f0 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -188,27 +188,7 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { - let snapshot = match self.snapshot_id { - Some(snapshot_id) => self - .table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Snapshot with id {} not found", snapshot_id), - ) - })? - .clone(), - None => self - .table - .metadata() - .current_snapshot() - .ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") - })? - .clone(), - }; + let snapshot = self.table.snapshot(self.snapshot_id)?; let schema = snapshot.schema(self.table.metadata())?; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..efb9a7db0 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -24,7 +24,7 @@ use crate::inspect::MetadataTable; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::scan::TableScanBuilder; -use crate::spec::{TableMetadata, TableMetadataRef}; +use crate::spec::{SnapshotRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -201,6 +201,29 @@ impl Table { TableScanBuilder::new(self) } + /// Get the specified or latest snapshot for this table + pub fn snapshot(&self, snapshot_id: Option) -> Result { + Ok(match snapshot_id { + Some(snapshot_id) => self + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", snapshot_id), + ) + })? + .clone(), + None => self + .metadata() + .current_snapshot() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") + })? + .clone(), + }) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn inspect(&self) -> MetadataTable<'_> { diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 1586298ff..93fb1f679 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -22,31 +22,28 @@ use arrow_schema::TimeUnit; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::assert_batches_eq; use datafusion::catalog::TableProvider; -use datafusion::error::DataFusionError; +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion::logical_expr::{col, lit}; use datafusion::prelude::SessionContext; -use iceberg::{Catalog, TableIdent}; +use iceberg::{Catalog, Result, TableIdent}; use iceberg_datafusion::IcebergTableProvider; use iceberg_integration_tests::set_test_fixture; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; #[tokio::test] -async fn test_basic_queries() -> Result<(), DataFusionError> { +async fn test_basic_queries() -> Result<()> { let fixture = set_test_fixture("datafusion_basic_read").await; let catalog = fixture.rest_catalog; let table = catalog .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap()) - .await - .unwrap(); + .await?; let ctx = SessionContext::new(); - let table_provider = Arc::new( - IcebergTableProvider::try_new_from_table(table) - .await - .unwrap(), - ); + let table_provider = Arc::new(IcebergTableProvider::try_new_from_table(table).await?); let schema = table_provider.schema(); @@ -117,13 +114,15 @@ async fn test_basic_queries() -> Result<(), DataFusionError> { ]) ); - ctx.register_table("types_table", table_provider)?; + ctx.register_table("types_table", table_provider).unwrap(); let batches = ctx .sql("SELECT * FROM types_table ORDER BY cbigint LIMIT 3") - .await? + .await + .unwrap() .collect() - .await?; + .await + .unwrap(); let expected = [ "+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+", "| cboolean | ctinyint | csmallint | cint | cbigint | cfloat | cdouble | cdecimal | cdate | ctimestamp_ntz | ctimestamp | cstring | cbinary |", @@ -136,3 +135,82 @@ async fn test_basic_queries() -> Result<(), DataFusionError> { assert_batches_eq!(expected, &batches); Ok(()) } + +#[tokio::test] +async fn test_statistics() -> Result<()> { + let fixture = set_test_fixture("datafusion_statistics").await; + + let catalog = fixture.rest_catalog; + + // Test table statistics + let table = catalog + .load_table(&TableIdent::from_strs([ + "default", + "test_positional_merge_on_read_double_deletes", + ])?) + .await?; + + let table_provider = IcebergTableProvider::try_new_from_table(table) + .await? + .with_computed_statistics() + .await; + + let table_stats = table_provider.statistics(); + + assert_eq!( + table_stats, + Some(Statistics { + num_rows: Precision::Inexact(12), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), + min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), + min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), + distinct_count: Precision::Absent, + }, + ], + }) + ); + + // Test plan statistics with filtering + let ctx = SessionContext::new(); + let scan = table_provider + .scan( + &ctx.state(), + Some(&vec![1]), + &[col("number").gt(lit(4))], + None, + ) + .await + .unwrap(); + + let plan_stats = scan.statistics().unwrap(); + + // The estimate for the number of rows and the min value for the column are changed in response + // to the filtration + assert_eq!(plan_stats, Statistics { + num_rows: Precision::Inexact(8), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), + distinct_count: Precision::Absent, + },], + }); + + Ok(()) +} diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index ccb9ca175..182fa2f7a 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -37,6 +37,7 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } +log = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index b7b927fdd..d8765de47 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -23,6 +23,9 @@ pub use error::*; mod physical_plan; mod schema; +mod statistics; mod table; + +pub use statistics::*; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 03fb132f2..0c13c2562 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -20,7 +20,7 @@ use std::vec; use datafusion::logical_expr::{Expr, Operator}; use datafusion::scalar::ScalarValue; use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression}; -use iceberg::spec::Datum; +use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType}; // A datafusion expression could be an Iceberg predicate, column, or literal. enum TransformedResult { @@ -196,20 +196,44 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000; /// Convert a scalar value to an iceberg datum. fn scalar_value_to_datum(value: &ScalarValue) -> Option { match value { + ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)), ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)), ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)), - ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)), + ScalarValue::Float32(Some(v)) => Some(Datum::float(*v)), ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)), - ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())), - ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), + ScalarValue::Utf8(Some(v)) + | ScalarValue::Utf8View(Some(v)) + | ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)), ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)), _ => None, } } +/// Convert an iceberg datum to a datafusion scalar value. +pub fn datum_to_scalar_value(datum: &Datum) -> Option { + match (datum.data_type(), datum.literal()) { + (PrimitiveType::Binary, PrimitiveLiteral::Boolean(v)) => { + Some(ScalarValue::Boolean(Some(*v))) + } + (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Int32(Some(*v))), + (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(ScalarValue::Int64(Some(*v))), + (PrimitiveType::Float, PrimitiveLiteral::Float(v)) => { + Some(ScalarValue::Float32(Some(v.into_inner()))) + } + (PrimitiveType::Double, PrimitiveLiteral::Double(v)) => { + Some(ScalarValue::Float64(Some(v.into_inner()))) + } + (PrimitiveType::String, PrimitiveLiteral::String(v)) => { + Some(ScalarValue::Utf8View(Some(v.clone()))) + } + (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Date32(Some(*v))), + _ => None, + } +} + #[cfg(test)] mod tests { use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index f33437eec..6a88c8b62 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -22,9 +22,11 @@ use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::common::{Statistics, ToDFSchema}; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::EquivalenceProperties; +use datafusion::logical_expr::utils::conjunction; +use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; @@ -32,9 +34,10 @@ use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; use iceberg::expr::Predicate; use iceberg::table::Table; +use log::warn; use super::expr_to_predicate::convert_filters_to_predicate; -use crate::to_datafusion_error; +use crate::{apply_bounds, to_datafusion_error}; /// Manages the scanning process of an Iceberg [`Table`], encapsulating the /// necessary details and computed properties required for execution planning. @@ -44,6 +47,9 @@ pub(crate) struct IcebergTableScan { table: Table, /// Snapshot of the table to scan. snapshot_id: Option, + /// Statistics for the scan; row count and null count/min-max values per column. + /// If not present defaults to empty (absent) statistics. + statistics: Statistics, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: PlanProperties, @@ -59,6 +65,7 @@ impl IcebergTableScan { table: Table, snapshot_id: Option, schema: ArrowSchemaRef, + statistics: Option, projection: Option<&Vec>, filters: &[Expr], ) -> Self { @@ -66,6 +73,26 @@ impl IcebergTableScan { None => schema.clone(), Some(projection) => Arc::new(schema.project(projection).unwrap()), }; + + let statistics = statistics + .map(|stats| { + let stats = match projection { + None => stats, + Some(projection) => stats.project(Some(projection)), + }; + Self::bound_statistics(stats.clone(), filters, output_schema.clone()) + }) + .transpose() + .inspect_err(|err| { + warn!( + "Failed to bound input statistics, defaulting to none: {:?}", + err + ) + }) + .ok() + .flatten() + .unwrap_or(Statistics::new_unknown(output_schema.as_ref())); + let plan_properties = Self::compute_properties(output_schema.clone()); let projection = get_column_names(schema.clone(), projection); let predicates = convert_filters_to_predicate(filters); @@ -73,6 +100,7 @@ impl IcebergTableScan { Self { table, snapshot_id, + statistics, plan_properties, projection, predicates, @@ -91,6 +119,23 @@ impl IcebergTableScan { Boundedness::Bounded, ) } + + /// Estimate the effective bounded statistics corresponding to the provided filter expressions + fn bound_statistics( + input_stats: Statistics, + filters: &[Expr], + schema: ArrowSchemaRef, + ) -> DFResult { + Ok(if let Some(filters) = conjunction(filters.to_vec()) { + let schema = schema.clone(); + let df_schema = schema.clone().to_dfschema()?; + let predicate = create_physical_expr(&filters, &df_schema, &Default::default())?; + + apply_bounds(input_stats, &predicate, schema)? + } else { + input_stats + }) + } } impl ExecutionPlan for IcebergTableScan { @@ -135,6 +180,10 @@ impl ExecutionPlan for IcebergTableScan { stream, ))) } + + fn statistics(&self) -> DFResult { + Ok(self.statistics.clone()) + } } impl DisplayAs for IcebergTableScan { diff --git a/crates/integrations/datafusion/src/statistics.rs b/crates/integrations/datafusion/src/statistics.rs new file mode 100644 index 000000000..0a7db6ff4 --- /dev/null +++ b/crates/integrations/datafusion/src/statistics.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, Statistics}; +use datafusion::error::Result as DFResult; +use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries, PhysicalExpr}; +use iceberg::spec::{DataContentType, ManifestStatus}; +use iceberg::table::Table; +use iceberg::Result; + +use crate::physical_plan::expr_to_predicate::datum_to_scalar_value; + +// Compute DataFusion table statistics for a given table/snapshot +pub async fn compute_statistics(table: &Table, snapshot_id: Option) -> Result { + let file_io = table.file_io(); + let metadata = table.metadata(); + let snapshot = table.snapshot(snapshot_id)?; + + let mut num_rows = 0; + let mut lower_bounds = HashMap::new(); + let mut upper_bounds = HashMap::new(); + let mut null_counts = HashMap::new(); + + let manifest_list = snapshot.load_manifest_list(file_io, metadata).await?; + + // For each existing/added manifest in the snapshot aggregate the row count, as well as null + // count and min/max values. + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(file_io).await?; + manifest.entries().iter().for_each(|manifest_entry| { + // Gather stats only for non-deleted data files + if manifest_entry.status() != ManifestStatus::Deleted { + let data_file = manifest_entry.data_file(); + if data_file.content_type() == DataContentType::Data { + num_rows += data_file.record_count(); + data_file.lower_bounds().iter().for_each(|(col_id, min)| { + lower_bounds + .entry(*col_id) + .and_modify(|col_min| { + if min < col_min { + *col_min = min.clone() + } + }) + .or_insert(min.clone()); + }); + data_file.upper_bounds().iter().for_each(|(col_id, max)| { + upper_bounds + .entry(*col_id) + .and_modify(|col_max| { + if max > col_max { + *col_max = max.clone() + } + }) + .or_insert(max.clone()); + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, null_count)| { + null_counts + .entry(*col_id) + .and_modify(|col_null_count| *col_null_count += *null_count) + .or_insert(*null_count); + }); + } + } + }) + } + + // Construct the DataFusion `Statistics` object, leaving any missing info as `Precision::Absent` + let schema = snapshot.schema(metadata)?; + let col_stats = schema + .as_struct() + .fields() + .iter() + .map(|field| { + ColumnStatistics { + null_count: null_counts + .get(&field.id) + .map(|nc| Precision::Inexact(*nc as usize)) + .unwrap_or(Precision::Absent), + max_value: upper_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + min_value: lower_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + distinct_count: Precision::Absent, // will be picked up after #417 + } + }) + .collect(); + + Ok(Statistics { + num_rows: Precision::Inexact(num_rows as usize), + total_byte_size: Precision::Absent, + column_statistics: col_stats, + }) +} + +// Apply bounds to the provided input statistics. +// +// Adapted from `FilterExec::statistics_helper` in DataFusion. +pub fn apply_bounds( + input_stats: Statistics, + predicate: &Arc, + schema: SchemaRef, +) -> DFResult { + let num_rows = input_stats.num_rows; + let total_byte_size = input_stats.total_byte_size; + let input_analysis_ctx = + AnalysisContext::try_from_statistics(&schema, &input_stats.column_statistics)?; + + let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; + + // Estimate (inexact) selectivity of predicate + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let num_rows = num_rows.with_estimated_selectivity(selectivity); + let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); + + let column_statistics = analysis_ctx + .boundaries + .into_iter() + .enumerate() + .map( + |( + idx, + ExprBoundaries { + interval, + distinct_count, + .. + }, + )| { + let (lower, upper) = interval.into_bounds(); + let (min_value, max_value) = if lower.eq(&upper) { + (Precision::Exact(lower), Precision::Exact(upper)) + } else { + (Precision::Inexact(lower), Precision::Inexact(upper)) + }; + ColumnStatistics { + null_count: input_stats.column_statistics[idx].null_count.to_inexact(), + max_value, + min_value, + distinct_count: distinct_count.to_inexact(), + } + }, + ) + .collect(); + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..8d7336ccf 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -16,13 +16,13 @@ // under the License. pub mod table_provider_factory; - use std::any::Any; use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; +use datafusion::common::Statistics; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; @@ -31,6 +31,7 @@ use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use crate::compute_statistics; use crate::physical_plan::scan::IcebergTableScan; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], @@ -41,6 +42,9 @@ pub struct IcebergTableProvider { table: Table, /// Table snapshot id that will be queried via this provider. snapshot_id: Option, + /// Statistics for the table; row count and null count/min-max values per column. + /// If not present defaults to `None`. + statistics: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, } @@ -51,6 +55,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + statistics: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -70,6 +75,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + statistics: None, }) } @@ -81,6 +87,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + statistics: None, }) } @@ -105,8 +112,19 @@ impl IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, + statistics: None, }) } + + // Try to compute the underlying table statistics directly from the manifest/data files + pub async fn with_computed_statistics(mut self) -> Self { + let statistics = compute_statistics(&self.table, self.snapshot_id) + .await + .inspect_err(|err| log::warn!("Failed computing table statistics: {err}")) + .ok(); + self.statistics = statistics; + self + } } #[async_trait] @@ -134,11 +152,16 @@ impl TableProvider for IcebergTableProvider { self.table.clone(), self.snapshot_id, self.schema.clone(), + self.statistics.clone(), projection, filters, ))) } + fn statistics(&self) -> Option { + self.statistics.clone() + } + fn supports_filters_pushdown( &self, filters: &[&Expr],