From f9816b08064a1adf297f86d1c694f6daefb20968 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Mon, 8 Aug 2022 18:25:56 +0200 Subject: [PATCH] Prune scanned files on column stats (#724) * feat: handle partition values in table scan * test: add partitioned table query test * feat: prune files to scan based on expressions * cleanup file partitioning * test: cleanup tests * perf: advoid unncessarily iterating files * fix: allow for predicates not relevant for file pruning --- Cargo.lock | 7 +- rust/Cargo.toml | 2 +- rust/src/checkpoints.rs | 7 +- rust/src/delta_datafusion.rs | 241 ++++++++++++++++++++++++++-------- rust/src/operations/mod.rs | 12 ++ rust/src/schema.rs | 19 +++ rust/tests/datafusion_test.rs | 165 ++++++++++++++++++++++- 7 files changed, 386 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 294747ad96..d378e36b1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,15 +394,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +checksum = "6127248204b9aba09a362f6c930ef6a78f2c1b2215f8a7b398c06e1083f17af0" dependencies = [ - "libc", + "js-sys", "num-integer", "num-traits", "serde", "time", + "wasm-bindgen", "winapi", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index ac4aef3a23..e7f905fe83 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.3" bytes = "1" log = "0" regex = "1" -chrono = "0" +chrono = "0.4.20" uuid = { version = "1", features = ["serde", "v4"] } lazy_static = "1" percent-encoding = "2" diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index 9d293b40a9..67f3dfb827 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -3,10 +3,7 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow::error::ArrowError; use arrow::json::reader::{Decoder, DecoderOptions}; -use chrono::Datelike; -use chrono::Duration; -use chrono::Utc; -use chrono::MIN_DATETIME; +use chrono::{DateTime, Datelike, Duration, Utc}; use futures::StreamExt; use lazy_static::lazy_static; use log::*; @@ -229,7 +226,7 @@ async fn cleanup_expired_logs_for( 0, ObjectMeta { location: Path::from(""), - last_modified: MIN_DATETIME, + last_modified: DateTime::::MIN_UTC, size: 0, }, ); diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 9aa91a82f7..2d5d58a31f 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -21,27 +21,55 @@ //! ``` use std::any::Any; +use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; +use arrow::array::ArrayRef; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit}; use async_trait::async_trait; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; +use chrono::{DateTime, NaiveDateTime, Utc}; +use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result as DataFusionResult; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::SessionState; -use datafusion::logical_plan::Expr; +use datafusion::logical_plan::{combine_filters, Column, Expr}; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::file_format::FileScanConfig; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_plan::{ColumnStatistics, Statistics}; +use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; use datafusion::scalar::ScalarValue; +use object_store::{path::Path, ObjectMeta}; use url::Url; use crate::action; use crate::delta; use crate::schema; +use crate::DeltaTableError; + +impl From for DataFusionError { + fn from(err: DeltaTableError) -> Self { + match err { + DeltaTableError::ArrowError { source } => DataFusionError::ArrowError(source), + DeltaTableError::Io { source } => DataFusionError::IoError(source), + DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source), + DeltaTableError::ParquetError { source } => DataFusionError::ParquetError(source), + _ => DataFusionError::External(Box::new(err)), + } + } +} + +impl From for crate::DeltaTableError { + fn from(err: DataFusionError) -> Self { + match err { + DataFusionError::ArrowError(source) => DeltaTableError::ArrowError { source }, + DataFusionError::IoError(source) => DeltaTableError::Io { source }, + DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source }, + DataFusionError::ParquetError(source) => DeltaTableError::ParquetError { source }, + _ => DeltaTableError::Generic(err.to_string()), + } + } +} impl delta::DeltaTable { /// Return statistics for Datafusion Table @@ -193,39 +221,86 @@ impl delta::DeltaTable { } } -// TODO: uncomment this when datafusion supports per partitioned file stats -// fn add_action_df_stats(add: &action::Add, schema: &schema::Schema) -> Statistics { -// if let Ok(Some(statistics)) = add.get_stats() { -// Statistics { -// num_rows: Some(statistics.num_records as usize), -// total_byte_size: Some(add.size as usize), -// column_statistics: Some( -// schema -// .get_fields() -// .iter() -// .map(|field| ColumnStatistics { -// null_count: statistics -// .null_count -// .get(field.get_name()) -// .and_then(|f| f.as_value().map(|v| v as usize)), -// max_value: statistics -// .max_values -// .get(field.get_name()) -// .and_then(|f| to_scalar_value(f.as_value()?)), -// min_value: statistics -// .min_values -// .get(field.get_name()) -// .and_then(|f| to_scalar_value(f.as_value()?)), -// distinct_count: None, // TODO: distinct -// }) -// .collect(), -// ), -// is_exact: true, -// } -// } else { -// Statistics::default() -// } -// } +impl PruningStatistics for delta::DeltaTable { + /// return the minimum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows + fn min_values(&self, column: &Column) -> Option { + let field = self + .get_schema() + .ok() + .map(|s| s.get_field_with_name(&column.name).ok())??; + let data_type = field.get_type().try_into().ok()?; + let values = self.get_state().files().iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + statistics + .min_values + .get(&column.name) + .and_then(|f| { + correct_scalar_value_type( + to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null), + &data_type, + ) + }) + .unwrap_or(ScalarValue::Null) + } else { + ScalarValue::Null + } + }); + ScalarValue::iter_to_array(values).ok() + } + + /// return the maximum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows. + fn max_values(&self, column: &Column) -> Option { + let field = self + .get_schema() + .ok() + .map(|s| s.get_field_with_name(&column.name).ok())??; + let data_type = field.get_type().try_into().ok()?; + let values = self.get_state().files().iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + statistics + .max_values + .get(&column.name) + .and_then(|f| { + correct_scalar_value_type( + to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null), + &data_type, + ) + }) + .unwrap_or(ScalarValue::Null) + } else { + ScalarValue::Null + } + }); + ScalarValue::iter_to_array(values).ok() + } + + /// return the number of containers (e.g. row groups) being + /// pruned with these statistics + fn num_containers(&self) -> usize { + self.get_state().files().len() + } + + /// return the number of null values for the named column as an + /// `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option { + let values = self.get_state().files().iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + statistics + .null_count + .get(&column.name) + .map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64))) + .unwrap_or(ScalarValue::UInt64(None)) + } else { + ScalarValue::UInt64(None) + } + }); + ScalarValue::iter_to_array(values).ok() + } +} #[async_trait] impl TableProvider for delta::DeltaTable { @@ -263,29 +338,55 @@ impl TableProvider for delta::DeltaTable { self.object_store(), ); - // TODO prune files based on file statistics and filter expressions - let partitions = self - .get_state() - .files() - .iter() - .map(|action| { - Ok(vec![PartitionedFile::new( - action.path.clone(), - action.size as u64, - )]) - }) - .collect::>()?; + // TODO we group files together by their partition values. If the table is partitioned + // and partitions are somewhat evenly distributed, probably not the worst choice ... + // However we may want to do some additional balancing in case we are far off from the above. + let mut file_groups: HashMap, Vec> = HashMap::new(); + if let Some(Some(predicate)) = (!filters.is_empty()).then_some(combine_filters(filters)) { + let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?; + let files_to_prune = pruning_predicate.prune(self)?; + self.get_state() + .files() + .iter() + .zip(files_to_prune.into_iter()) + .for_each(|(action, prune_file)| { + if !prune_file { + let part = partitioned_file_from_action(action, &schema); + file_groups + .entry(part.partition_values.clone()) + .or_default() + .push(part); + }; + }); + } else { + self.get_state().files().iter().for_each(|action| { + let part = partitioned_file_from_action(action, &schema); + file_groups + .entry(part.partition_values.clone()) + .or_default() + .push(part); + }); + }; + let table_partition_cols = self.get_metadata()?.partition_columns.clone(); + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect(), + )); ParquetFormat::default() .create_physical_plan( FileScanConfig { object_store_url, - file_schema: schema, - file_groups: partitions, + file_schema, + file_groups: file_groups.into_values().collect(), statistics: self.datafusion_table_statistics(), projection: projection.clone(), limit, - table_partition_cols: self.get_metadata().unwrap().partition_columns.clone(), + table_partition_cols, }, filters, ) @@ -297,6 +398,38 @@ impl TableProvider for delta::DeltaTable { } } +fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile { + let partition_values = schema + .fields() + .iter() + .filter_map(|f| { + action.partition_values.get(f.name()).map(|val| match val { + Some(value) => { + match to_scalar_value(&serde_json::Value::String(value.to_string())) { + Some(parsed) => correct_scalar_value_type(parsed, f.data_type()) + .unwrap_or(ScalarValue::Null), + None => ScalarValue::Null, + } + } + None => ScalarValue::Null, + }) + }) + .collect::>(); + let ts_secs = action.modification_time / 1000; + let ts_ns = (action.modification_time % 1000) * 1_000_000; + let last_modified = + DateTime::::from_utc(NaiveDateTime::from_timestamp(ts_secs, ts_ns as u32), Utc); + PartitionedFile { + object_meta: ObjectMeta { + location: Path::from(action.path.clone()), + last_modified, + size: action.size as usize, + }, + partition_values, + range: None, + } +} + fn to_scalar_value(stat_val: &serde_json::Value) -> Option { match stat_val { serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)), diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index c6e9aa979e..8aaafba710 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -231,6 +231,18 @@ fn get_table_from_uri_without_update(table_uri: String) -> DeltaCommandResult for DeltaCommands { + fn from(table: DeltaTable) -> Self { + Self { table } + } +} + +impl From for DeltaTable { + fn from(comm: DeltaCommands) -> Self { + comm.table + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/src/schema.rs b/rust/src/schema.rs index 8903be40ea..9bbb9b0807 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -43,6 +43,25 @@ impl SchemaTypeStruct { pub fn get_fields(&self) -> &Vec { &self.fields } + + /// Returns an immutable reference of a specific `Field` instance selected by name. + pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> { + Ok(&self.fields[self.index_of(name)?]) + } + + /// Find the index of the column with the given name. + pub fn index_of(&self, name: &str) -> Result { + for i in 0..self.fields.len() { + if self.fields[i].get_name() == name { + return Ok(i); + } + } + let valid_fields: Vec = self.fields.iter().map(|f| f.name.clone()).collect(); + Err(crate::DeltaTableError::Generic(format!( + "Unable to get field named \"{}\". Valid fields: {:?}", + name, valid_fields + ))) + } } /// Describes a specific field of the Delta table schema. diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index db3ca92e51..7f99f3fea1 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -4,12 +4,95 @@ mod s3_common; #[cfg(feature = "datafusion-ext")] mod datafusion { - use std::sync::Arc; + use std::{collections::HashSet, sync::Arc}; - use arrow::array::*; - use datafusion::error::Result; - use datafusion::execution::context::SessionContext; + use arrow::{ + array::*, + datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + SchemaRef as ArrowSchemaRef, + }, + record_batch::RecordBatch, + }; + use datafusion::datasource::TableProvider; + use datafusion::error::{DataFusionError, Result}; + use datafusion::execution::context::{SessionContext, TaskContext}; + use datafusion::logical_expr::Expr; + use datafusion::logical_plan::Column; + use datafusion::physical_plan::{ + coalesce_partitions::CoalescePartitionsExec, common, file_format::ParquetExec, + metrics::Label, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, + }; use datafusion::scalar::ScalarValue; + use deltalake::{action::SaveMode, operations::DeltaCommands, DeltaTable, DeltaTableMetaData}; + use std::collections::HashMap; + + fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet