Skip to content

Commit

Permalink
Merge branch 'main' into alamb/df_46
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Mar 3, 2025
2 parents 082f9a8 + a6c9837 commit 1ec6ed6
Show file tree
Hide file tree
Showing 54 changed files with 1,706 additions and 2,580 deletions.
18 changes: 15 additions & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider,
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
Expand All @@ -55,6 +56,7 @@ use datafusion_common::{
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
Expand Down Expand Up @@ -544,9 +546,19 @@ impl<'a> DeltaScanBuilder<'a> {

let context = SessionContext::new();
let df_schema = logical_schema.clone().to_dfschema()?;
let logical_filter = self
.filter
.map(|expr| context.create_physical_expr(expr, &df_schema).unwrap());

let logical_filter = self.filter.map(|expr| {
// Simplify the expression first
let props = ExecutionProps::new();
let simplify_context =
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
let simplified = simplifier.simplify(expr).unwrap();

context
.create_physical_expr(simplified, &df_schema)
.unwrap()
});

// Perform Pruning of files to scan
let (files, files_scanned, files_pruned) = match self.files {
Expand Down
40 changes: 38 additions & 2 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use chrono::{DateTime, Utc};
use delta_kernel::expressions::Scalar;
use delta_kernel::expressions::{Scalar, StructData};
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -276,12 +276,26 @@ impl LogicalFile<'_> {
.column_by_name(COL_MIN_VALUES)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
}

/// Struct containing all available max values for the columns in this file.
pub fn max_values(&self) -> Option<Scalar> {
// With delta.checkpoint.writeStatsAsStruct the microsecond timestamps are truncated to ms as defined by protocol
// this basically implies that it's floored when we parse_stats on the fly they are not truncated
// to tackle this we always round upwards by 1ms
fn ceil_datetime(v: i64) -> i64 {
let remainder = v % 1000;
if remainder == 0 {
// if nanoseconds precision remainder is 0, we assume it was truncated
// else we use the exact stats
((v as f64 / 1000.0).floor() as i64 + 1) * 1000
} else {
v
}
}

self.stats
.column_by_name(COL_MAX_VALUES)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
.map(|s| round_ms_datetimes(s, &ceil_datetime))
}

pub fn add_action(&self) -> Add {
Expand Down Expand Up @@ -349,6 +363,28 @@ impl LogicalFile<'_> {
}
}

fn round_ms_datetimes<F>(value: Scalar, func: &F) -> Scalar
where
F: Fn(i64) -> i64,
{
match value {
Scalar::Timestamp(v) => Scalar::Timestamp(func(v)),
Scalar::TimestampNtz(v) => Scalar::TimestampNtz(func(v)),
Scalar::Struct(struct_data) => {
let mut fields = Vec::new();
let mut scalars = Vec::new();

for (field, value) in struct_data.fields().iter().zip(struct_data.values().iter()) {
fields.push(field.clone());
scalars.push(round_ms_datetimes(value.clone(), func));
}
let data = StructData::try_new(fields, scalars).unwrap();
Scalar::Struct(data)
}
value => value,
}
}

impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
type Error = DeltaTableError;

Expand Down
21 changes: 9 additions & 12 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
};

use super::transaction::CommitProperties;
use super::{CustomExecuteHandler, Operation};

/// Error converting a Parquet table to a Delta table
Expand Down Expand Up @@ -108,7 +109,8 @@ pub struct ConvertToDeltaBuilder {
name: Option<String>,
comment: Option<String>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

Expand Down Expand Up @@ -142,7 +144,7 @@ impl ConvertToDeltaBuilder {
name: None,
comment: None,
configuration: Default::default(),
metadata: Default::default(),
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
Expand Down Expand Up @@ -233,12 +235,9 @@ impl ConvertToDeltaBuilder {
self
}

/// Append custom (application-specific) metadata to the commit.
///
/// This might include provenance information such as an id of the
/// user that made the commit or the program that created it.
pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
self.metadata = Some(metadata);
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}

Expand Down Expand Up @@ -425,16 +424,14 @@ impl ConvertToDeltaBuilder {
.with_partition_columns(partition_columns.into_iter())
.with_actions(actions)
.with_save_mode(self.mode)
.with_configuration(self.configuration);
.with_configuration(self.configuration)
.with_commit_properties(self.commit_properties);
if let Some(name) = self.name {
builder = builder.with_table_name(name);
}
if let Some(comment) = self.comment {
builder = builder.with_comment(comment);
}
if let Some(metadata) = self.metadata {
builder = builder.with_metadata(metadata);
}
Ok((builder, operation_id))
}
}
Expand Down
27 changes: 10 additions & 17 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde_json::Value;
use tracing::log::*;
use uuid::Uuid;

use super::transaction::{CommitBuilder, TableReference, PROTOCOL};
use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL};
use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
Expand Down Expand Up @@ -59,7 +59,8 @@ pub struct CreateBuilder {
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
configuration: HashMap<String, Option<String>>,
metadata: Option<HashMap<String, Value>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
raise_if_key_not_exists: bool,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
Expand Down Expand Up @@ -95,7 +96,7 @@ impl CreateBuilder {
actions: Default::default(),
log_store: None,
configuration: Default::default(),
metadata: Default::default(),
commit_properties: CommitProperties::default(),
raise_if_key_not_exists: true,
custom_execute_handler: None,
}
Expand Down Expand Up @@ -212,15 +213,9 @@ impl CreateBuilder {
self
}

/// Append custom (application-specific) metadata to the commit.
///
/// This might include provenance information such as an id of the
/// user that made the commit or the program that created it.
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.metadata = Some(HashMap::from_iter(metadata));
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}

Expand Down Expand Up @@ -358,10 +353,9 @@ impl std::future::IntoFuture for CreateBuilder {
let this = self;
Box::pin(async move {
let handler = this.custom_execute_handler.clone();
let mode = this.mode;
let app_metadata = this.metadata.clone().unwrap_or_default();
let mode = &this.mode;
let (mut table, mut actions, operation, operation_id) =
this.into_table_and_actions().await?;
this.clone().into_table_and_actions().await?;

let table_state = if table.log_store.is_delta_table_location().await? {
match mode {
Expand All @@ -386,11 +380,10 @@ impl std::future::IntoFuture for CreateBuilder {
None
};

let version = CommitBuilder::default()
let version = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handler.clone())
.with_app_metadata(app_metadata)
.build(
table_state.map(|f| f as &dyn TableReference),
table.log_store.clone(),
Expand Down
47 changes: 30 additions & 17 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::SessionConfig;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::build_join_schema;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_plan::metrics::MetricBuilder;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::{
Expand All @@ -49,6 +50,8 @@ use datafusion::{
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{col, conditional_expressions::CaseBuilder, lit, when, Expr, JoinType};
use datafusion_expr::{
Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, UNNAMED_TABLE,
Expand All @@ -59,6 +62,7 @@ use filter::try_construct_early_filter;
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use tracing::field::debug;
use tracing::log::*;
use uuid::Uuid;

Expand Down Expand Up @@ -847,11 +851,32 @@ async fn execute(
streaming,
)
.await?
}
.map(|e| {
// simplify the expression so we have
let props = ExecutionProps::new();
let simplify_context = SimplifyContext::new(&props).with_schema(target.schema().clone());
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
simplifier.simplify(e).unwrap()
});

// Predicate will be used for conflict detection
let commit_predicate = match target_subset_filter.clone() {
None => None, // No predicate means it's a full table merge
Some(some_filter) => {
let predict_expr = match &target_alias {
None => some_filter,
Some(alias) => remove_table_alias(some_filter, alias),
};
Some(fmt_expr_to_sql(&predict_expr)?)
}
};

debug!("Using target subset filter: {:?}", commit_predicate);

let file_column = Arc::new(scan_config.file_column_name.clone().unwrap());
// Need to manually push this filter into the scan... We want to PRUNE files not FILTER RECORDS
let target = match target_subset_filter.clone() {
let target = match target_subset_filter {
Some(filter) => {
let filter = match &target_alias {
Some(alias) => remove_table_alias(filter, alias),
Expand Down Expand Up @@ -1410,18 +1435,6 @@ async fn execute(
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Predicate will be used for conflict detection
let commit_predicate = match target_subset_filter {
None => None, // No predicate means it's a full table merge
Some(some_filter) => {
let predict_expr = match &target_alias {
None => some_filter,
Some(alias) => remove_table_alias(some_filter, alias),
};
Some(fmt_expr_to_sql(&predict_expr)?)
}
};

// Do not make a commit when there are zero updates to the state
let operation = DeltaOperation::Merge {
predicate: commit_predicate,
Expand Down Expand Up @@ -2525,7 +2538,7 @@ mod tests {
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(
parameters["predicate"],
"id BETWEEN 'B' AND 'C' AND modified = '2021-02-02'"
"id >= 'B' AND id <= 'C' AND modified = '2021-02-02'"
);
assert_eq!(
parameters["mergePredicate"],
Expand Down Expand Up @@ -2776,7 +2789,7 @@ mod tests {
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);
assert_eq!(parameters["predicate"], "id BETWEEN 'B' AND 'X'");
assert_eq!(parameters["predicate"], "id >= 'B' AND id <= 'X'");
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
Expand Down Expand Up @@ -3195,7 +3208,7 @@ mod tests {

assert_eq!(
parameters["predicate"],
json!("id BETWEEN 'B' AND 'X' AND modified = '2021-02-02'")
json!("id >= 'B' AND id <= 'X' AND modified = '2021-02-02'")
);

let expected = vec![
Expand Down Expand Up @@ -3279,7 +3292,7 @@ mod tests {

assert_eq!(
parameters["predicate"],
json!("id BETWEEN 'B' AND 'X' AND modified = '2021-02-02'")
json!("id >= 'B' AND id <= 'X' AND modified = '2021-02-02'")
);

let expected = vec![
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use datafusion_physical_plan::common::collect as collect_sendable_stream;
use self::add_column::AddColumnBuilder;
use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
#[cfg(feature = "datafusion")]
use self::optimize::OptimizeBuilder;
use self::restore::RestoreBuilder;
use self::set_tbl_properties::SetTablePropertiesBuilder;
Expand All @@ -43,7 +44,6 @@ pub mod convert_to_delta;
pub mod create;
pub mod drop_constraints;
pub mod filesystem_check;
pub mod optimize;
pub mod restore;
pub mod transaction;
pub mod update_field_metadata;
Expand All @@ -61,6 +61,8 @@ mod load;
pub mod load_cdf;
#[cfg(feature = "datafusion")]
pub mod merge;
#[cfg(feature = "datafusion")]
pub mod optimize;
pub mod set_tbl_properties;
#[cfg(feature = "datafusion")]
pub mod update;
Expand Down Expand Up @@ -227,6 +229,7 @@ impl DeltaOps {
}

/// Audit active files with files present on the filesystem
#[cfg(feature = "datafusion")]
#[must_use]
pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap())
Expand Down
Loading

0 comments on commit 1ec6ed6

Please sign in to comment.