Skip to content

Commit

Permalink
Prune scanned files on column stats (#724)
Browse files Browse the repository at this point in the history
* feat: handle partition values in table scan

* test: add partitioned table query test

* feat: prune files to scan based on expressions

* cleanup file partitioning

* test: cleanup tests

* perf: advoid unncessarily iterating files

* fix: allow for predicates not relevant for file pruning
  • Loading branch information
roeap authored Aug 8, 2022
1 parent 3873b0b commit f9816b0
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 67 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ futures = "0.3"
bytes = "1"
log = "0"
regex = "1"
chrono = "0"
chrono = "0.4.20"
uuid = { version = "1", features = ["serde", "v4"] }
lazy_static = "1"
percent-encoding = "2"
Expand Down
7 changes: 2 additions & 5 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::reader::{Decoder, DecoderOptions};
use chrono::Datelike;
use chrono::Duration;
use chrono::Utc;
use chrono::MIN_DATETIME;
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
Expand Down Expand Up @@ -229,7 +226,7 @@ async fn cleanup_expired_logs_for(
0,
ObjectMeta {
location: Path::from(""),
last_modified: MIN_DATETIME,
last_modified: DateTime::<Utc>::MIN_UTC,
size: 0,
},
);
Expand Down
241 changes: 187 additions & 54 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,55 @@
//! ```
use std::any::Any;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
use async_trait::async_trait;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::{combine_filters, Column, Expr};
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use datafusion::scalar::ScalarValue;
use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::action;
use crate::delta;
use crate::schema;
use crate::DeltaTableError;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
DeltaTableError::ArrowError { source } => DataFusionError::ArrowError(source),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::ParquetError { source } => DataFusionError::ParquetError(source),
_ => DataFusionError::External(Box::new(err)),
}
}
}

impl From<DataFusionError> for crate::DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::ArrowError { source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::ParquetError { source },
_ => DeltaTableError::Generic(err.to_string()),
}
}
}

impl delta::DeltaTable {
/// Return statistics for Datafusion Table
Expand Down Expand Up @@ -193,39 +221,86 @@ impl delta::DeltaTable {
}
}

// TODO: uncomment this when datafusion supports per partitioned file stats
// fn add_action_df_stats(add: &action::Add, schema: &schema::Schema) -> Statistics {
// if let Ok(Some(statistics)) = add.get_stats() {
// Statistics {
// num_rows: Some(statistics.num_records as usize),
// total_byte_size: Some(add.size as usize),
// column_statistics: Some(
// schema
// .get_fields()
// .iter()
// .map(|field| ColumnStatistics {
// null_count: statistics
// .null_count
// .get(field.get_name())
// .and_then(|f| f.as_value().map(|v| v as usize)),
// max_value: statistics
// .max_values
// .get(field.get_name())
// .and_then(|f| to_scalar_value(f.as_value()?)),
// min_value: statistics
// .min_values
// .get(field.get_name())
// .and_then(|f| to_scalar_value(f.as_value()?)),
// distinct_count: None, // TODO: distinct
// })
// .collect(),
// ),
// is_exact: true,
// }
// } else {
// Statistics::default()
// }
// }
impl PruningStatistics for delta::DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.min_values
.get(&column.name)
.and_then(|f| {
correct_scalar_value_type(
to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null),
&data_type,
)
})
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.max_values
.get(&column.name)
.and_then(|f| {
correct_scalar_value_type(
to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null),
&data_type,
)
})
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize {
self.get_state().files().len()
}

/// return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
} else {
ScalarValue::UInt64(None)
}
});
ScalarValue::iter_to_array(values).ok()
}
}

#[async_trait]
impl TableProvider for delta::DeltaTable {
Expand Down Expand Up @@ -263,29 +338,55 @@ impl TableProvider for delta::DeltaTable {
self.object_store(),
);

// TODO prune files based on file statistics and filter expressions
let partitions = self
.get_state()
.files()
.iter()
.map(|action| {
Ok(vec![PartitionedFile::new(
action.path.clone(),
action.size as u64,
)])
})
.collect::<DataFusionResult<_>>()?;
// TODO we group files together by their partition values. If the table is partitioned
// and partitions are somewhat evenly distributed, probably not the worst choice ...
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
if let Some(Some(predicate)) = (!filters.is_empty()).then_some(combine_filters(filters)) {
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.for_each(|(action, prune_file)| {
if !prune_file {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
};
});
} else {
self.get_state().files().iter().for_each(|action| {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
});
};

let table_partition_cols = self.get_metadata()?.partition_columns.clone();
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect(),
));
ParquetFormat::default()
.create_physical_plan(
FileScanConfig {
object_store_url,
file_schema: schema,
file_groups: partitions,
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.datafusion_table_statistics(),
projection: projection.clone(),
limit,
table_partition_cols: self.get_metadata().unwrap().partition_columns.clone(),
table_partition_cols,
},
filters,
)
Expand All @@ -297,6 +398,38 @@ impl TableProvider for delta::DeltaTable {
}
}

fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile {
let partition_values = schema
.fields()
.iter()
.filter_map(|f| {
action.partition_values.get(f.name()).map(|val| match val {
Some(value) => {
match to_scalar_value(&serde_json::Value::String(value.to_string())) {
Some(parsed) => correct_scalar_value_type(parsed, f.data_type())
.unwrap_or(ScalarValue::Null),
None => ScalarValue::Null,
}
}
None => ScalarValue::Null,
})
})
.collect::<Vec<_>>();
let ts_secs = action.modification_time / 1000;
let ts_ns = (action.modification_time % 1000) * 1_000_000;
let last_modified =
DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(ts_secs, ts_ns as u32), Utc);
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(action.path.clone()),
last_modified,
size: action.size as usize,
},
partition_values,
range: None,
}
}

fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::ScalarValue> {
match stat_val {
serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)),
Expand Down
12 changes: 12 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ fn get_table_from_uri_without_update(table_uri: String) -> DeltaCommandResult<De
Ok(table)
}

impl From<DeltaTable> for DeltaCommands {
fn from(table: DeltaTable) -> Self {
Self { table }
}
}

impl From<DeltaCommands> for DeltaTable {
fn from(comm: DeltaCommands) -> Self {
comm.table
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 19 additions & 0 deletions rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ impl SchemaTypeStruct {
pub fn get_fields(&self) -> &Vec<SchemaField> {
&self.fields
}

/// Returns an immutable reference of a specific `Field` instance selected by name.
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> {
Ok(&self.fields[self.index_of(name)?])
}

/// Find the index of the column with the given name.
pub fn index_of(&self, name: &str) -> Result<usize, crate::DeltaTableError> {
for i in 0..self.fields.len() {
if self.fields[i].get_name() == name {
return Ok(i);
}
}
let valid_fields: Vec<String> = self.fields.iter().map(|f| f.name.clone()).collect();
Err(crate::DeltaTableError::Generic(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
name, valid_fields
)))
}
}

/// Describes a specific field of the Delta table schema.
Expand Down
Loading

0 comments on commit f9816b0

Please sign in to comment.