Skip to content

Commit

Permalink
feat: arrow-backed file statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 13, 2024
1 parent 2836a1e commit 5415073
Show file tree
Hide file tree
Showing 40 changed files with 1,126 additions and 534 deletions.
1 change: 0 additions & 1 deletion crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ fn add_action(name: &str) -> Action {
path: format!("{}.parquet", name),
size: 396,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: ts as i64,
data_change: true,
stats: None,
Expand Down
144 changes: 8 additions & 136 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
Expand All @@ -72,13 +71,11 @@ use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use tracing::error;
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataCheck, Invariant};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
Expand Down Expand Up @@ -114,23 +111,6 @@ impl From<DataFusionError> for DeltaTableError {
}
}

fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc<Field>) -> Precision<ScalarValue> {
match value {
Some(ColumnValueStat::Value(value)) => to_correct_scalar_value(value, field.data_type())
.map(|maybe_scalar| maybe_scalar.map(Precision::Exact).unwrap_or_default())
.unwrap_or_else(|_| {
error!(
"Unable to parse scalar value of {:?} with type {} for column {}",
value,
field.data_type(),
field.name()
);
Precision::Absent
}),
_ => Precision::Absent,
}
}

pub(crate) fn get_path_column<'a>(
batch: &'a RecordBatch,
path_column: &str,
Expand All @@ -148,109 +128,8 @@ pub(crate) fn get_path_column<'a>(

impl DeltaTableState {
/// Provide table level statistics to Datafusion
pub fn datafusion_table_statistics(&self) -> DataFusionResult<Statistics> {
// Statistics only support primitive types. Any non primitive column will not have their statistics captured
// If column statistics are missing for any add actions then we simply downgrade to Absent.

let schema = self.arrow_schema()?;
// Downgrade statistics to absent if file metadata is not present.
let mut downgrade = false;
let unknown_stats = Statistics::new_unknown(&schema);

let files = self.files()?;

// Initalize statistics
let mut table_stats = match files.first() {
Some(file) => match file.get_stats() {
Ok(Some(stats)) => {
let mut column_statistics = Vec::with_capacity(schema.fields().size());
let total_byte_size = Precision::Exact(file.size as usize);
let num_rows = Precision::Exact(stats.num_records as usize);

for field in schema.fields() {
let null_count = match stats.null_count.get(field.name()) {
Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize),
_ => Precision::Absent,
};

let max_value = get_scalar_value(stats.max_values.get(field.name()), field);
let min_value = get_scalar_value(stats.min_values.get(field.name()), field);

column_statistics.push(ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
});
}

Statistics {
total_byte_size,
num_rows,
column_statistics,
}
}
Ok(None) => {
downgrade = true;
let mut stats = unknown_stats.clone();
stats.total_byte_size = Precision::Exact(file.size as usize);
stats
}
_ => return Ok(unknown_stats),
},
None => {
// The Table is empty
let mut stats = unknown_stats;
stats.num_rows = Precision::Exact(0);
stats.total_byte_size = Precision::Exact(0);
return Ok(stats);
}
};

// Populate the remaining statistics. If file statistics are not present then relevant statistics are downgraded to absent.
for file in &files.as_slice()[1..] {
let byte_size = Precision::Exact(file.size as usize);
table_stats.total_byte_size = table_stats.total_byte_size.add(&byte_size);

if !downgrade {
match file.get_stats() {
Ok(Some(stats)) => {
let num_records = Precision::Exact(stats.num_records as usize);

table_stats.num_rows = table_stats.num_rows.add(&num_records);

for (idx, field) in schema.fields().iter().enumerate() {
let column_stats = table_stats.column_statistics.get_mut(idx).unwrap();

let null_count = match stats.null_count.get(field.name()) {
Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize),
_ => Precision::Absent,
};

let max_value =
get_scalar_value(stats.max_values.get(field.name()), field);
let min_value =
get_scalar_value(stats.min_values.get(field.name()), field);

column_stats.null_count = column_stats.null_count.add(&null_count);
column_stats.max_value = column_stats.max_value.max(&max_value);
column_stats.min_value = column_stats.min_value.min(&min_value);
}
}
Ok(None) => {
downgrade = true;
}
Err(_) => return Ok(unknown_stats),
}
}
}

if downgrade {
table_stats.column_statistics = unknown_stats.column_statistics;
table_stats.num_rows = Precision::Absent;
}

Ok(table_stats)
pub fn datafusion_table_statistics(&self) -> Option<Statistics> {
self.snapshot.datafusion_table_statistics()
}
}

Expand Down Expand Up @@ -522,13 +401,7 @@ impl<'a> DeltaScanBuilder<'a> {
let stats = self
.snapshot
.datafusion_table_statistics()
.unwrap_or_else(|e| {
error!(
"Error while computing table statistics. Using unknown statistics. {}",
e
);
Statistics::new_unknown(&schema)
});
.unwrap_or(Statistics::new_unknown(&schema));

let scan = ParquetFormat::new()
.create_physical_plan(
Expand Down Expand Up @@ -608,7 +481,7 @@ impl TableProvider for DeltaTable {
}

fn statistics(&self) -> Option<Statistics> {
self.get_state()?.datafusion_table_statistics().ok()
self.get_state()?.datafusion_table_statistics()
}
}

Expand Down Expand Up @@ -687,7 +560,7 @@ impl TableProvider for DeltaTableProvider {
}

fn statistics(&self) -> Option<Statistics> {
self.snapshot.datafusion_table_statistics().ok()
self.snapshot.datafusion_table_statistics()
}
}

Expand Down Expand Up @@ -1672,7 +1545,6 @@ mod tests {
size: 10644,
partition_values,
modification_time: 1660497727833,
partition_values_parsed: None,
data_change: true,
stats: None,
deletion_vector: None,
Expand Down Expand Up @@ -1807,7 +1679,7 @@ mod tests {
.unwrap();
let config = DeltaScanConfigBuilder::new()
.with_file_column_name(&"file_source")
.build(&table.snapshot().unwrap())
.build(table.snapshot().unwrap())
.unwrap();

let log_store = table.log_store();
Expand Down
11 changes: 0 additions & 11 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,6 @@ pub struct Add {
/// The name of the clustering implementation
pub clustering_provider: Option<String>,

// TODO remove migration filds added to not do too many business logic changes in one PR
/// Partition values stored in raw parquet struct format. In this struct, the column names
/// correspond to the partition columns and the values are stored in their corresponding data
/// type. This is a required field when the table is partitioned and the table property
/// delta.checkpoint.writeStatsAsStruct is set to true. If the table is not partitioned, this
/// column can be omitted.
///
/// This field is only available in add action records read from checkpoints
#[serde(skip_serializing, skip_deserializing)]
pub partition_values_parsed: Option<parquet::record::Row>,

/// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
/// raw parquet format. This field needs to be written when statistics are available and the
/// table property: delta.checkpoint.writeStatsAsStruct is set to true.
Expand Down
Loading

0 comments on commit 5415073

Please sign in to comment.