diff --git a/Cargo.toml b/Cargo.toml index a1ed02d779..474ae5189c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,34 +26,34 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "=0.3.0" } +delta_kernel = { version = "0.3.1" } # delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } # arrow -arrow = { version = "52" } -arrow-arith = { version = "52" } -arrow-array = { version = "52", features = ["chrono-tz"] } -arrow-buffer = { version = "52" } -arrow-cast = { version = "52" } -arrow-ipc = { version = "52" } -arrow-json = { version = "52" } -arrow-ord = { version = "52" } -arrow-row = { version = "52" } -arrow-schema = { version = "52" } -arrow-select = { version = "52" } -object_store = { version = "0.10.1" } -parquet = { version = "52" } +arrow = { version = "53" } +arrow-arith = { version = "53" } +arrow-array = { version = "53", features = ["chrono-tz"] } +arrow-buffer = { version = "53" } +arrow-cast = { version = "53" } +arrow-ipc = { version = "53" } +arrow-json = { version = "53" } +arrow-ord = { version = "53" } +arrow-row = { version = "53" } +arrow-schema = { version = "53" } +arrow-select = { version = "53" } +object_store = { version = "0.11.0" } +parquet = { version = "53" } # datafusion -datafusion = { version = "41" } -datafusion-expr = { version = "41" } -datafusion-common = { version = "41" } -datafusion-proto = { version = "41" } -datafusion-sql = { version = "41" } -datafusion-physical-expr = { version = "41" } -datafusion-physical-plan = { version = "41" } -datafusion-functions = { version = "41" } -datafusion-functions-aggregate = { version = "41" } +datafusion = { version = "42" } +datafusion-expr = { version = "42" } +datafusion-common = { version = "42" } +datafusion-proto = { version = "42" } +datafusion-sql = { version = "42" } +datafusion-physical-expr = { version = "42" } +datafusion-physical-plan = { version = "42" } +datafusion-functions = { version = "42" } +datafusion-functions-aggregate = { version = "42" } # serde serde = { version = "1.0.194", features = ["derive"] } @@ -91,10 +91,30 @@ datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusio datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } -datafusion-physical-expr-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-proto-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-sql = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } datafusion-substrait = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' } +# datafusion = {path = "../arrow-datafusion-42/datafusion/core" } +# datafusion-catalog = {path = "../arrow-datafusion-42/datafusion/catalog" } +# datafusion-common = {path = "../arrow-datafusion-42/datafusion/common" } +# datafusion-common-runtime = {path = "../arrow-datafusion-42/datafusion/common-runtime" } +# datafusion-execution = {path = "../arrow-datafusion-42/datafusion/execution" } +# datafusion-expr = {path = "../arrow-datafusion-42/datafusion/expr" } +# datafusion-expr-common = {path = "../arrow-datafusion-42/datafusion/expr-common" } +# datafusion-functions = {path = "../arrow-datafusion-42/datafusion/functions" } +# datafusion-functions-aggregate = {path = "../arrow-datafusion-42/datafusion/functions-aggregate" } +# datafusion-functions-aggregate-common = {path = "../arrow-datafusion-42/datafusion/functions-aggregate-common" } +# datafusion-functions-nested = {path = "../arrow-datafusion-42/datafusion/functions-nested" } +# datafusion-functions-window = {path = "../arrow-datafusion-42/datafusion/functions-window" } +# datafusion-optimizer = {path = "../arrow-datafusion-42/datafusion/optimizer" } +# datafusion-physical-expr = {path = "../arrow-datafusion-42/datafusion/physical-expr" } +# datafusion-physical-expr-common = {path = "../arrow-datafusion-42/datafusion/physical-expr-common" } +# datafusion-physical-optimizer = {path = "../arrow-datafusion-42/datafusion/physical-optimizer" } +# datafusion-physical-plan = {path = "../arrow-datafusion-42/datafusion/physical-plan" } +# datafusion-proto = {path = "../arrow-datafusion-42/datafusion/proto" } +# datafusion-proto-common = {path = "../arrow-datafusion-42/datafusion/proto-common" } +# datafusion-sql = {path = "../arrow-datafusion-42/datafusion/sql" } +# datafusion-substrait = {path = "../arrow-datafusion-42/datafusion/substrait" } diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 7b0b779069..f7aa5d8d45 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log"; /// /// assuming it contains valid deltalake data, i.e a `_delta_log` folder: /// s3://host.example.com:3000/data/tpch/customer/_delta_log/ +#[derive(Debug)] pub struct ListingSchemaProvider { authority: String, /// Underlying object store @@ -60,7 +61,12 @@ impl ListingSchemaProvider { /// Reload table information from ObjectStore pub async fn refresh(&self) -> datafusion_common::Result<()> { - let entries: Vec<_> = self.store.list(None).try_collect().await?; + let tmp1 = self.store.list(None); + + let entries: Vec = self.store.list(None) + .try_collect() + .await + .map_err(|e| DataFusionError::ObjectStore(e))?; let mut tables = HashSet::new(); for file in entries.iter() { let mut parent = Path::new(file.location.as_ref()); diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 44e7c9ca33..3e32a3ad68 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse; use crate::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog +#[derive(Debug)] pub struct UnityCatalogList { /// Collection of catalogs containing schemas and ultimately TableProviders pub catalogs: DashMap>, @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList { } /// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnityCatalogProvider { /// Parent catalog for schemas of interest. pub schemas: DashMap>, @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider { } /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnitySchemaProvider { /// UnityCatalog Api client client: Arc, diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs index 4dd4a3b5da..9dc55b1674 100644 --- a/crates/core/src/delta_datafusion/find_files/logical.rs +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::collections::HashSet; use std::hash::{Hash, Hasher}; @@ -63,6 +64,12 @@ impl Hash for FindFilesNode { } } +impl PartialOrd for FindFilesNode { + fn partial_cmp(&self, other: &Self) -> Option { + None + } +} + impl UserDefinedLogicalNodeCore for FindFilesNode { fn name(&self) -> &str { "FindFiles" diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 0c235242c2..632c5dc066 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -44,7 +44,7 @@ lazy_static! { #[derive(Default)] struct FindFilesPlannerExtension {} -#[derive(Default)] +#[derive(Default, Debug)] struct FindFilesPlanner {} #[async_trait] diff --git a/crates/core/src/delta_datafusion/logical.rs b/crates/core/src/delta_datafusion/logical.rs index 2ce435b5b6..4aaf30242f 100644 --- a/crates/core/src/delta_datafusion/logical.rs +++ b/crates/core/src/delta_datafusion/logical.rs @@ -7,7 +7,7 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; // Metric Observer is used to update DataFusion metrics from a record batch. // See MetricObserverExec for the physical implementation -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MetricObserver { // id is preserved during conversion to physical node pub id: String, diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 7053e83649..29f7245de9 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -21,6 +21,7 @@ //! ``` use std::any::Any; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -42,6 +43,8 @@ use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetExec, }; + +use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -69,7 +72,7 @@ use datafusion_sql::planner::ParserOptions; use either::Either; use futures::TryStreamExt; use itertools::Itertools; -use object_store::ObjectMeta; +use object_store::{Error, ObjectMeta}; use serde::{Deserialize, Serialize}; use url::Url; @@ -727,7 +730,7 @@ impl TableProvider for DeltaTable { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } @@ -791,6 +794,7 @@ impl TableProvider for DeltaTable { } /// A Delta table provider that enables additional metadata columns to be included during the scan +#[derive(Debug)] pub struct DeltaTableProvider { snapshot: DeltaTableState, log_store: LogStoreRef, @@ -840,7 +844,7 @@ impl TableProvider for DeltaTableProvider { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } @@ -1442,6 +1446,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { } /// Responsible for creating deltatables +#[derive(Debug)] pub struct DeltaTableFactory {} #[async_trait] diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 6119b78ce6..a2e46970b8 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -22,6 +22,8 @@ //! }; //! //! let state = state.with_query_planner(Arc::new(merge_planner)); + +use std::fmt::{Debug, Formatter}; use std::sync::Arc; use async_trait::async_trait; @@ -32,7 +34,7 @@ use datafusion::{ physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner}, }; use datafusion_expr::LogicalPlan; - +use serde::Serializer; use crate::delta_datafusion::DataFusionResult; /// Deltaplanner @@ -41,6 +43,12 @@ pub struct DeltaPlanner { pub extension_planner: T, } +impl Debug for DeltaPlanner { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DeltaPlanner") + } +} + #[async_trait] impl QueryPlanner for DeltaPlanner { async fn create_physical_plan( diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 99a97e2130..562684c1cb 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -1,10 +1,11 @@ use std::fmt::Debug; use std::sync::Arc; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow_cast::{can_cast_types, cast}; +use arrow_schema::{Field, Schema, SchemaRef}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; - +use datafusion_common::plan_err; use crate::operations::cast::cast_record_batch; /// A Schema Adapter Factory which provides casting record batches from parquet to meet @@ -13,21 +14,23 @@ use crate::operations::cast::cast_record_batch; pub(crate) struct DeltaSchemaAdapterFactory {} impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { - fn create(&self, schema: SchemaRef) -> Box { + fn create(&self, projected_table_schema: SchemaRef, table_schema: SchemaRef) -> Box { Box::new(DeltaSchemaAdapter { - table_schema: schema, + projected_table_schema, + table_schema, }) } } pub(crate) struct DeltaSchemaAdapter { + projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, } impl SchemaAdapter for DeltaSchemaAdapter { fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); + let field = self.projected_table_schema.field(index); Some(file_schema.fields.find(field.name())?.0) } @@ -45,6 +48,7 @@ impl SchemaAdapter for DeltaSchemaAdapter { Ok(( Arc::new(SchemaMapping { + projected_table_schema: self.projected_table_schema.clone(), table_schema: self.table_schema.clone(), }), projection, @@ -54,12 +58,13 @@ impl SchemaAdapter for DeltaSchemaAdapter { #[derive(Debug)] pub(crate) struct SchemaMapping { + projected_table_schema: SchemaRef, table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + let record_batch = cast_record_batch(&batch, self.projected_table_schema.clone(), false, true)?; Ok(record_batch) } diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index bc1bd6eed9..35d1b705c9 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -73,6 +73,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => create_escaped_binary_string(val.as_slice()), Self::Null(_) => "null".to_string(), Self::Struct(_) => unimplemented!(), + Self::Array(_) => unimplemented!() } } @@ -269,6 +270,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())), Self::Null(_) => Value::Null, Self::Struct(_) => unimplemented!(), + Self::Array(v) => unimplemented!(), } } } diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 596304e003..f408b223e5 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -365,6 +365,7 @@ impl LogSegment { config: &DeltaTableConfig, ) -> DeltaResult> + '_> { let log_path = table_root.child("_delta_log"); + let tmp = Arc::new(read_schema.clone()); let mut decoder = json::get_decoder(Arc::new(read_schema.try_into()?), config)?; let mut commit_data = Vec::new(); diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 9084d721b7..09f58a6979 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -393,7 +393,7 @@ impl RecordBatchStream for MergeBarrierStream { } } -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MergeBarrier { pub input: LogicalPlan, pub expr: Expr, diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index e4b93a54f5..eee28b8be8 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -613,7 +613,7 @@ mod tests { Some($value), Some($value), None, - 0, + Some(0), false, )) }; diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 2d654d0bc1..e7d8040404 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -13,7 +13,7 @@ rust-version.workspace = true [dependencies] deltalake-core = { version = "0.21.0", path = "../core" } -hdfs-native-object-store = "0.11" +hdfs-native-object-store = "0.12.1" # workspace dependecies object_store = { workspace = true } diff --git a/crates/hdfs/src/lib.rs b/crates/hdfs/src/lib.rs index 45b14740b7..52ba850312 100644 --- a/crates/hdfs/src/lib.rs +++ b/crates/hdfs/src/lib.rs @@ -4,7 +4,7 @@ use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFa use deltalake_core::storage::{ factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions, }; -use deltalake_core::{DeltaResult, Path}; +use deltalake_core::{DeltaResult, DeltaTableError, Path}; use hdfs_native_object_store::HdfsObjectStore; use url::Url; @@ -20,7 +20,7 @@ impl ObjectStoreFactory for HdfsFactory { let store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config( url.as_str(), options.0.clone(), - )?); + ).map_err(|e| DeltaTableError::ObjectStore { source: e})?); let prefix = Path::parse(url.path())?; Ok((url_prefix_handler(store, prefix.clone()), prefix)) } diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 6e3c7d5dbc..263d98352c 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::fmt::{self, Debug, Display}; use std::sync::Arc; @@ -6,7 +7,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; /// Delta Lake specific operations -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum DeltaStatement { /// Get provenance information, including the operation, /// user, and so on, for each write to a table. @@ -145,6 +146,13 @@ impl Vacuum { } } +impl PartialOrd for Vacuum { + fn partial_cmp(&self, other: &Self) -> Option { + self.table.partial_cmp(&other.table) + } +} + + /// Logical Plan for [DescribeHistory] operation. /// /// [DescribeHistory]: https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-describe-history @@ -156,6 +164,12 @@ pub struct DescribeHistory { pub schema: DFSchemaRef, } +impl PartialOrd for DescribeHistory { + fn partial_cmp(&self, other: &Self) -> Option { + self.table.partial_cmp(&other.table) + } +} + impl DescribeHistory { pub fn new(table: TableReference) -> Self { Self { @@ -186,6 +200,12 @@ impl DescribeDetails { } } +impl PartialOrd for DescribeDetails { + fn partial_cmp(&self, other: &Self) -> Option { + self.table.partial_cmp(&other.table) + } +} + /// Logical Plan for DescribeFiles operation. #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeFiles { @@ -205,6 +225,12 @@ impl DescribeFiles { } } +impl PartialOrd for DescribeFiles { + fn partial_cmp(&self, other: &Self) -> Option { + self.table.partial_cmp(&other.table) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/python/Cargo.toml b/python/Cargo.toml index fadc260d0d..5ff2baaee1 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -43,8 +43,8 @@ reqwest = { version = "*", features = ["native-tls-vendored"] } deltalake-mount = { path = "../crates/mount" } [dependencies.pyo3] -version = "0.21.1" -features = ["extension-module", "abi3", "abi3-py38"] +version = "0.22.2" +features = ["extension-module", "abi3", "abi3-py312"] [dependencies.deltalake] path = "../crates/deltalake" diff --git a/python/src/lib.rs b/python/src/lib.rs index 473f5ceea9..bf040bd00f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1426,6 +1426,9 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult { + todo!() + }, }; Ok(val.into_bound(py))