diff --git a/Cargo.toml b/Cargo.toml index a48f8c7894..4c03c84f99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.4.1", features = ["sync-engine"] } +delta_kernel = { version = "0.5.0", features = ["sync-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 69ac891838..9a868f5e82 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -154,6 +154,7 @@ impl std::fmt::Debug for DynamoDbLockClient { impl DynamoDbLockClient { /// Creates a new DynamoDbLockClient from the supplied storage options. + #[allow(clippy::too_many_arguments)] pub fn try_new( sdk_config: &SdkConfig, lock_table_name: Option, diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 562ca3da90..b94b7048b1 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -486,6 +486,9 @@ mod datafusion { use std::collections::HashSet; use std::sync::Arc; + use super::*; + use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; + use crate::kernel::ARROW_HANDLER; use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use ::datafusion::physical_optimizer::pruning::PruningStatistics; use ::datafusion::physical_plan::Accumulator; @@ -499,12 +502,9 @@ mod datafusion { use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::expressions::Expression; use delta_kernel::schema::{DataType, PrimitiveType}; + use delta_kernel::AsAny; use delta_kernel::{ExpressionEvaluator, ExpressionHandler}; - use super::*; - use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; - use crate::kernel::ARROW_HANDLER; - #[derive(Debug, Default, Clone)] enum AccumulatorType { Min, @@ -725,9 +725,12 @@ mod datafusion { return None; } let expression = if self.metadata.partition_columns.contains(&column.name) { - Expression::Column(format!("add.partitionValues_parsed.{}", column.name)) + Expression::column(vec![format!("add.partitionValues_parsed.{}", column.name)]) } else { - Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name)) + Expression::column(vec![format!( + "add.stats_parsed.{}.{}", + stats_field, column.name + )]) }; let evaluator = ARROW_HANDLER.get_evaluator( crate::kernel::models::fields::log_schema_ref().clone(), @@ -737,9 +740,9 @@ mod datafusion { let mut results = Vec::with_capacity(self.data.len()); for batch in self.data.iter() { let engine = ArrowEngineData::new(batch.clone()); - let result = evaluator.evaluate(&engine).ok()?; + let result = Arc::new(evaluator.evaluate(&engine).ok()?); let result = result - .as_any() + .any_ref() .downcast_ref::() .ok_or(DeltaTableError::generic( "failed to downcast evaluator result to ArrowEngineData.", @@ -748,7 +751,7 @@ mod datafusion { results.push(result.record_batch().clone()); } let batch = concat_batches(results[0].schema_ref(), &results).ok()?; - batch.column_by_name("output").map(|c| c.clone()) + batch.column_by_name("output").cloned() } } @@ -803,16 +806,16 @@ mod datafusion { lazy_static::lazy_static! { static ref ROW_COUNTS_EVAL: Arc = ARROW_HANDLER.get_evaluator( crate::kernel::models::fields::log_schema_ref().clone(), - Expression::column("add.stats_parsed.numRecords"), + Expression::column(vec!["add.stats_parsed.numRecords"].into_iter()), DataType::Primitive(PrimitiveType::Long), ); } let mut results = Vec::with_capacity(self.data.len()); for batch in self.data.iter() { let engine = ArrowEngineData::new(batch.clone()); - let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?; + let result = Arc::new(ROW_COUNTS_EVAL.evaluate(&engine).ok()?); let result = result - .as_any() + .any_ref() .downcast_ref::() .ok_or(DeltaTableError::generic( "failed to downcast evaluator result to ArrowEngineData.", diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index d5763b5006..25e11b88ca 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -523,7 +523,7 @@ impl EagerSnapshot { /// Get the table config which is loaded with of the snapshot pub fn load_config(&self) -> &DeltaTableConfig { - &self.snapshot.load_config() + self.snapshot.load_config() } /// Well known table configuration @@ -696,7 +696,7 @@ fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult, + partition_columns: &[String], ) -> DeltaResult> { if partition_columns.is_empty() { return Ok(None); @@ -705,7 +705,7 @@ pub(crate) fn partitions_schema( partition_columns .iter() .map(|col| { - schema.field(col).map(|field| field.clone()).ok_or_else(|| { + schema.field(col).cloned().ok_or_else(|| { DeltaTableError::Generic(format!( "Partition column {} not found in schema", col diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 1b18b61bc7..5ddad7d1e0 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -54,7 +54,7 @@ impl<'a, S> ReplayStream<'a, S> { visitors: &'a mut Vec>, ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?); - let partitions_schema = snapshot.partitions_schema(None)?.map(|s| Arc::new(s)); + let partitions_schema = snapshot.partitions_schema(None)?.map(Arc::new); let mapper = Arc::new(LogMapper { stats_schema, partitions_schema, @@ -83,9 +83,7 @@ impl LogMapper { ) -> DeltaResult { Ok(Self { stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?), - partitions_schema: snapshot - .partitions_schema(table_schema)? - .map(|s| Arc::new(s)), + partitions_schema: snapshot.partitions_schema(table_schema)?.map(Arc::new), config: snapshot.config.clone(), }) } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 01dcb962b6..89a6cf1473 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -247,7 +247,7 @@ async fn execute( // [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560> let rules: Vec> = state .optimizers() - .into_iter() + .iter() .filter(|rule| { rule.name() != "optimize_projections" && rule.name() != "simplify_expressions" }) diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index f8a223560a..89ca45aedd 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::{collections::HashMap, str::FromStr}; -use delta_kernel::features::ColumnMappingMode; +use delta_kernel::table_features::ColumnMappingMode; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index e4a374b763..5225b1db4c 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -14,7 +14,7 @@ use arrow_array::{ use arrow_cast::cast; use arrow_cast::parse::Parser; use arrow_schema::{DataType, Field, Fields, TimeUnit}; -use delta_kernel::features::ColumnMappingMode; +use delta_kernel::table_features::ColumnMappingMode; use itertools::Itertools; use super::state::DeltaTableState; diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index c09efbf651..08cb16259c 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -135,7 +135,7 @@ fn stats_from_metadata( let idx_to_iterate = if let Some(stats_cols) = stats_columns { let stats_cols = stats_cols - .into_iter() + .iter() .map(|v| { match sqlparser::parser::Parser::new(&dialect) .try_with_sql(v.as_ref()) @@ -143,13 +143,11 @@ fn stats_from_metadata( .parse_multipart_identifier() { Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")), - Err(e) => { - return Err(DeltaWriterError::DeltaTable( - DeltaTableError::GenericError { - source: Box::new(e), - }, - )) - } + Err(e) => Err(DeltaWriterError::DeltaTable( + DeltaTableError::GenericError { + source: Box::new(e), + }, + )), } }) .collect::, DeltaWriterError>>()?; diff --git a/python/src/lib.rs b/python/src/lib.rs index c4a4d80b78..c2b6d10622 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,18 +21,12 @@ use delta_kernel::expressions::Scalar; use delta_kernel::schema::StructField; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; -use deltalake::arrow::pyarrow::ToPyArrow; use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator}; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; -use deltalake::datafusion::datasource::provider_as_source; -use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use deltalake::datafusion::physical_plan::ExecutionPlan; -use deltalake::datafusion::prelude::{DataFrame, SessionContext}; -use deltalake::delta_datafusion::{ - DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig, - DeltaTableProvider, -}; +use deltalake::datafusion::prelude::SessionContext; +use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,