Skip to content

Commit

Permalink
chore: Update to DataFusion 46.0.0, update for API chanages
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Lamb <[email protected]>
  • Loading branch information
alamb committed Mar 9, 2025
1 parent 9d98ac2 commit e9a56aa
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 126 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: Format
Expand All @@ -42,7 +42,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: build and lint with clippy
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: Run tests
Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

# Install Java and Hadoop for HDFS integration tests
Expand Down Expand Up @@ -160,7 +160,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: Download Lakectl
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
Expand Down
23 changes: 11 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"

[workspace.package]
authors = ["Qingping Hou <[email protected]>"]
rust-version = "1.81"
rust-version = "1.82"
keywords = ["deltalake", "delta", "datalake"]
readme = "README.md"
edition = "2021"
Expand Down Expand Up @@ -45,16 +45,16 @@ object_store = { version = "0.11.2" , features = ["cloud"]}
parquet = { version = "54" }

# datafusion
datafusion = "45"
datafusion-expr = "45"
datafusion-common = "45"
datafusion-ffi = "45"
datafusion-functions = "45"
datafusion-functions-aggregate = "45"
datafusion-physical-expr = "45"
datafusion-physical-plan = "45"
datafusion-proto = "45"
datafusion-sql = "45"
datafusion = "46"
datafusion-expr = "46"
datafusion-common = "46"
datafusion-ffi = "46"
datafusion-functions = "46"
datafusion-functions-aggregate = "46"
datafusion-physical-expr = "46"
datafusion-physical-plan = "46"
datafusion-proto = "46"
datafusion-sql = "46"

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand All @@ -77,4 +77,3 @@ async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

10 changes: 6 additions & 4 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
use datafusion_expr::{
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, ScalarFunctionArgs, TableSource,
};
// Needed for MakeParquetArray
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_functions::core::planner::CoreFunctionPlanner;
Expand Down Expand Up @@ -99,13 +101,13 @@ impl ScalarUDFImpl for MakeParquetArray {
r_type
}

fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
for arg in &args.args {
data_type = arg.data_type();
}

match self.actual.invoke_batch(args, number_rows)? {
match self.actual.invoke_with_args(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
Expand Down
100 changes: 53 additions & 47 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
};
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
Expand Down Expand Up @@ -660,36 +659,39 @@ impl<'a> DeltaScanBuilder<'a> {
..Default::default()
};

let mut exec_plan_builder = ParquetExecBuilder::new(
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);
let mut file_source = ParquetSource::new(parquet_options)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
if let Some(predicate) = logical_filter {
if config.enable_parquet_pushdown {
exec_plan_builder = exec_plan_builder.with_predicate(predicate);
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
}
};

let file_scan_config = FileScanConfig::new(
self.log_store.object_store_url(),
file_schema,
Arc::new(file_source),
)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols);

let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics)
.global_counter("files_scanned")
Expand All @@ -700,7 +702,7 @@ impl<'a> DeltaScanBuilder<'a> {

Ok(DeltaScan {
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: exec_plan_builder.build_arc(),
parquet_scan: file_scan_config.build(),
config,
logical_schema,
metrics,
Expand Down Expand Up @@ -1972,7 +1974,7 @@ mod tests {
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
use datafusion_expr::lit;
Expand Down Expand Up @@ -2725,7 +2727,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
Expand Down Expand Up @@ -2760,7 +2762,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert!(visitor.predicate.is_none());
Expand Down Expand Up @@ -2789,42 +2791,46 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetOptionsVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
}

/// Extracts fields from the parquet scan
#[derive(Default)]
struct ParquetPredicateVisitor {
struct ParquetVisitor {
predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
options: Option<TableParquetOptions>,
}

impl ExecutionPlanVisitor for ParquetPredicateVisitor {
impl ExecutionPlanVisitor for ParquetVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.predicate = parquet_exec.predicate().cloned();
self.pruning_predicate = parquet_exec.pruning_predicate().cloned();
}
Ok(true)
}
}

#[derive(Default)]
struct ParquetOptionsVisitor {
options: Option<TableParquetOptions>,
}
let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
return Ok(true);
};

impl ExecutionPlanVisitor for ParquetOptionsVisitor {
type Error = DataFusionError;
let Some(scan_config) = datasource_exec
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
else {
return Ok(true);
};

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.options = Some(parquet_exec.table_parquet_options().clone())
if let Some(parquet_source) = scan_config
.file_source
.as_any()
.downcast_ref::<ParquetSource>()
{
self.options = Some(parquet_source.table_parquet_options().clone());
self.predicate = parquet_source.predicate().cloned();
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
}

Ok(true)
}
}
Expand Down
69 changes: 34 additions & 35 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use std::time::SystemTime;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, Field, Schema};
use chrono::{DateTime, Utc};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
use datafusion::execution::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -369,38 +368,38 @@ impl CdfLoadBuilder {
)?;

// Create the parquet scans for each associated type of file.
let cdc_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols),
filters,
)
.await?;

let add_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(
self.log_store.object_store_url(),
add_remove_file_schema.clone(),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone()),
filters,
)
.await?;

let remove_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols),
filters,
)
.await?;
let mut parquet_source = ParquetSource::new(TableParquetOptions::new());
if let Some(filters) = filters {
parquet_source =
parquet_source.with_predicate(Arc::clone(&cdc_file_schema), Arc::clone(filters));
}
let parquet_source: Arc<dyn FileSource> = Arc::new(parquet_source);
let cdc_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&cdc_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols)
.build();

let add_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone())
.build();

let remove_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
parquet_source,
)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols)
.build();

// The output batches are then unioned to create a single output. Coalesce partitions is only here for the time
// being for development. I plan to parallelize the reads once the base idea is correct.
Expand Down
Loading

0 comments on commit e9a56aa

Please sign in to comment.