From 1083c8c1b9b97bf3276b59329ebf4255b81b5968 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 28 Nov 2024 15:53:42 +0000 Subject: [PATCH 01/17] chore: bump key versions for the upcoming 0.22.1 release Signed-off-by: R. Tyler Croy --- crates/core/Cargo.toml | 2 +- crates/deltalake/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6b31bed778..948139dcc1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.0" +version = "0.22.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 9647de92bb..3c5a13172e 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.0" +version = "0.22.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index 53b5476cf5..c89f68b8c0 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.0" +version = "0.22.1" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" From a1c37b72c74dfc4f5cb00dd82648ccfd544ac58e Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 24 Nov 2024 13:46:21 +0100 Subject: [PATCH 02/17] fix: add nullability check in deltachecker --- crates/core/src/delta_datafusion/mod.rs | 79 ++++++++++++++++++++++++- python/tests/test_merge.py | 40 +++++++++++++ 2 files changed, 117 insertions(+), 2 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 4425b0ff6f..d260afcda7 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1150,11 +1150,12 @@ pub(crate) async fn execute_plan_to_batch( Ok(concat_batches(&plan.schema(), data.iter())?) } -/// Responsible for checking batches of data conform to table's invariants. -#[derive(Clone)] +/// Responsible for checking batches of data conform to table's invariants, constraints and nullability. +#[derive(Clone, Default)] pub struct DeltaDataChecker { constraints: Vec, invariants: Vec, + non_nullable_columns: Vec, ctx: SessionContext, } @@ -1164,6 +1165,7 @@ impl DeltaDataChecker { Self { invariants: vec![], constraints: vec![], + non_nullable_columns: vec![], ctx: DeltaSessionContext::default().into(), } } @@ -1173,6 +1175,7 @@ impl DeltaDataChecker { Self { invariants, constraints: vec![], + non_nullable_columns: vec![], ctx: DeltaSessionContext::default().into(), } } @@ -1182,6 +1185,7 @@ impl DeltaDataChecker { Self { constraints, invariants: vec![], + non_nullable_columns: vec![], ctx: DeltaSessionContext::default().into(), } } @@ -1202,9 +1206,21 @@ impl DeltaDataChecker { pub fn new(snapshot: &DeltaTableState) -> Self { let invariants = snapshot.schema().get_invariants().unwrap_or_default(); let constraints = snapshot.table_config().get_constraints(); + let non_nullable_columns = snapshot + .schema() + .fields() + .filter_map(|f| { + if !f.is_nullable() { + Some(f.name().clone()) + } else { + None + } + }) + .collect_vec(); Self { invariants, constraints, + non_nullable_columns, ctx: DeltaSessionContext::default().into(), } } @@ -1214,10 +1230,35 @@ impl DeltaDataChecker { /// If it does not, it will return [DeltaTableError::InvalidData] with a list /// of values that violated each invariant. pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> { + self.check_nullability(record_batch)?; self.enforce_checks(record_batch, &self.invariants).await?; self.enforce_checks(record_batch, &self.constraints).await } + /// Return true if all the nullability checks are valid + fn check_nullability(&self, record_batch: &RecordBatch) -> Result { + let mut violations = Vec::new(); + for col in self.non_nullable_columns.iter() { + if let Some(arr) = record_batch.column_by_name(col) { + if arr.null_count() > 0 { + violations.push(format!( + "Non-nullable column violation for {col}, found {} null values", + arr.null_count() + )); + } + } else { + violations.push(format!( + "Non-nullable column violation for {col}, not found in batch!" + )); + } + } + if !violations.is_empty() { + Err(DeltaTableError::InvalidData { violations }) + } else { + Ok(true) + } + } + async fn enforce_checks( &self, record_batch: &RecordBatch, @@ -2598,4 +2639,38 @@ mod tests { assert_eq!(actual.len(), 0); } + + #[tokio::test] + async fn test_check_nullability() -> DeltaResult<()> { + use arrow::array::StringArray; + + let data_checker = DeltaDataChecker { + non_nullable_columns: vec!["zed".to_string(), "yap".to_string()], + ..Default::default() + }; + + let arr: Arc = Arc::new(StringArray::from(vec!["s"])); + let nulls: Arc = Arc::new(StringArray::new_null(1)); + let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap(); + + let result = data_checker.check_nullability(&batch); + assert!( + result.is_err(), + "The result should have errored! {result:?}" + ); + + let arr: Arc = Arc::new(StringArray::from(vec!["s"])); + let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap(); + let result = data_checker.check_nullability(&batch); + assert!( + result.is_err(), + "The result should have errored! {result:?}" + ); + + let arr: Arc = Arc::new(StringArray::from(vec!["s"])); + let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap(); + let _ = data_checker.check_nullability(&batch)?; + + Ok(()) + } } diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 8645be538b..e8416f6e5f 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -6,6 +6,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.exceptions import DeltaProtocolError from deltalake.table import CommitProperties @@ -1080,3 +1081,42 @@ def test_cdc_merge_planning_union_2908(tmp_path): assert last_action["operation"] == "MERGE" assert dt.version() == 1 assert os.path.exists(cdc_path), "_change_data doesn't exist" + + +@pytest.mark.pandas +def test_merge_non_nullable(tmp_path): + import re + + import pandas as pd + + from deltalake.schema import Field, PrimitiveType, Schema + + schema = Schema( + [ + Field("id", PrimitiveType("integer"), nullable=False), + Field("bool", PrimitiveType("boolean"), nullable=False), + ] + ) + + dt = DeltaTable.create(tmp_path, schema=schema) + df = pd.DataFrame( + columns=["id", "bool"], + data=[ + [1, True], + [2, None], + [3, False], + ], + ) + + with pytest.raises( + DeltaProtocolError, + match=re.escape( + 'Invariant violations: ["Non-nullable column violation for bool, found 1 null values"]' + ), + ): + dt.merge( + source=df, + source_alias="s", + target_alias="t", + predicate="s.id = t.id", + ).when_matched_update_all().when_not_matched_insert_all().execute() From fdc1a0acabc087242ec68dc4ee3f7bb6f6283f9f Mon Sep 17 00:00:00 2001 From: Peter Ke Date: Sat, 2 Nov 2024 10:42:38 -0700 Subject: [PATCH 03/17] Implement query builder Signed-off-by: Peter Ke --- crates/core/src/delta_datafusion/mod.rs | 7 ++- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 5 +++ python/deltalake/query.py | 31 +++++++++++++ python/deltalake/warnings.py | 2 + python/src/error.rs | 8 ++++ python/src/lib.rs | 13 +++++- python/src/query.rs | 58 +++++++++++++++++++++++++ python/tests/test_table_read.py | 54 +++++++++++++++++++++++ 9 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 python/deltalake/query.py create mode 100644 python/deltalake/warnings.py create mode 100644 python/src/query.rs diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index d260afcda7..034781b85c 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -826,9 +826,12 @@ impl TableProvider for DeltaTableProvider { fn supports_filters_pushdown( &self, - _filter: &[&Expr], + filter: &[&Expr], ) -> DataFusionResult> { - Ok(vec![TableProviderFilterPushDown::Inexact]) + Ok(filter + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect()) } fn statistics(&self) -> Option { diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 43997076b2..2e82d20a40 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -2,6 +2,7 @@ from ._internal import __version__ as __version__ from ._internal import rust_core_version as rust_core_version from .data_catalog import DataCatalog as DataCatalog +from .query import QueryBuilder as QueryBuilder from .schema import DataType as DataType from .schema import Field as Field from .schema import Schema as Schema diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 026d84d08d..052cf1ebb6 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -874,6 +874,11 @@ class DeltaFileSystemHandler: ) -> ObjectOutputStream: """Open an output stream for sequential writing.""" +class PyQueryBuilder: + def __init__(self) -> None: ... + def register(self, table_name: str, delta_table: RawDeltaTable) -> None: ... + def execute(self, sql: str) -> List[pyarrow.RecordBatch]: ... + class DeltaDataChecker: def __init__(self, invariants: List[Tuple[str, str]]) -> None: ... def check_batch(self, batch: pyarrow.RecordBatch) -> None: ... diff --git a/python/deltalake/query.py b/python/deltalake/query.py new file mode 100644 index 0000000000..06e5144d24 --- /dev/null +++ b/python/deltalake/query.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import warnings +from typing import List + +import pyarrow + +from deltalake._internal import PyQueryBuilder +from deltalake.table import DeltaTable +from deltalake.warnings import ExperimentalWarning + + +class QueryBuilder: + def __init__(self) -> None: + warnings.warn( + "QueryBuilder is experimental and subject to change", + category=ExperimentalWarning, + ) + self._query_builder = PyQueryBuilder() + + def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: + """Add a table to the query builder.""" + self._query_builder.register( + table_name=table_name, + delta_table=delta_table._table, + ) + return self + + def execute(self, sql: str) -> List[pyarrow.RecordBatch]: + """Execute the query and return a list of record batches.""" + return self._query_builder.execute(sql) diff --git a/python/deltalake/warnings.py b/python/deltalake/warnings.py new file mode 100644 index 0000000000..83c5d34bcd --- /dev/null +++ b/python/deltalake/warnings.py @@ -0,0 +1,2 @@ +class ExperimentalWarning(Warning): + pass diff --git a/python/src/error.rs b/python/src/error.rs index a54b1e60b4..b1d22fc7ca 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -1,4 +1,5 @@ use arrow_schema::ArrowError; +use deltalake::datafusion::error::DataFusionError; use deltalake::protocol::ProtocolError; use deltalake::{errors::DeltaTableError, ObjectStoreError}; use pyo3::exceptions::{ @@ -79,6 +80,10 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr { } } +fn datafusion_to_py(err: DataFusionError) -> PyErr { + DeltaError::new_err(err.to_string()) +} + #[derive(thiserror::Error, Debug)] pub enum PythonError { #[error("Error in delta table")] @@ -89,6 +94,8 @@ pub enum PythonError { Arrow(#[from] ArrowError), #[error("Error in checkpoint")] Protocol(#[from] ProtocolError), + #[error("Error in data fusion")] + DataFusion(#[from] DataFusionError), } impl From for pyo3::PyErr { @@ -98,6 +105,7 @@ impl From for pyo3::PyErr { PythonError::ObjectStore(err) => object_store_to_py(err), PythonError::Arrow(err) => arrow_to_py(err), PythonError::Protocol(err) => checkpoint_to_py(err), + PythonError::DataFusion(err) => datafusion_to_py(err), } } } diff --git a/python/src/lib.rs b/python/src/lib.rs index 8da2544b3f..c4a4d80b78 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2,6 +2,7 @@ mod error; mod features; mod filesystem; mod merge; +mod query; mod schema; mod utils; @@ -20,12 +21,18 @@ use delta_kernel::expressions::Scalar; use delta_kernel::schema::StructField; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use deltalake::arrow::pyarrow::ToPyArrow; use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator}; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; +use deltalake::datafusion::datasource::provider_as_source; +use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use deltalake::datafusion::physical_plan::ExecutionPlan; -use deltalake::datafusion::prelude::SessionContext; -use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::datafusion::prelude::{DataFrame, SessionContext}; +use deltalake::delta_datafusion::{ + DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig, + DeltaTableProvider, +}; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction, @@ -69,6 +76,7 @@ use crate::error::PythonError; use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; +use crate::query::PyQueryBuilder; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; @@ -2095,6 +2103,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { )?)?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/query.rs b/python/src/query.rs new file mode 100644 index 0000000000..af3da38eee --- /dev/null +++ b/python/src/query.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use deltalake::{ + arrow::pyarrow::ToPyArrow, + datafusion::prelude::SessionContext, + delta_datafusion::{DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider}, +}; +use pyo3::prelude::*; + +use crate::{error::PythonError, utils::rt, RawDeltaTable}; + +#[pyclass(module = "deltalake._internal")] +pub(crate) struct PyQueryBuilder { + _ctx: SessionContext, +} + +#[pymethods] +impl PyQueryBuilder { + #[new] + pub fn new() -> Self { + let config = DeltaSessionConfig::default().into(); + let _ctx = SessionContext::new_with_config(config); + + PyQueryBuilder { _ctx } + } + + pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> { + let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?; + let log_store = delta_table._table.log_store(); + + let scan_config = DeltaScanConfigBuilder::default() + .build(snapshot) + .map_err(PythonError::from)?; + + let provider = Arc::new( + DeltaTableProvider::try_new(snapshot.clone(), log_store, scan_config) + .map_err(PythonError::from)?, + ); + + self._ctx + .register_table(table_name, provider) + .map_err(PythonError::from)?; + + Ok(()) + } + + pub fn execute(&self, py: Python, sql: &str) -> PyResult { + let batches = py.allow_threads(|| { + rt().block_on(async { + let df = self._ctx.sql(sql).await?; + df.collect().await + }) + .map_err(PythonError::from) + })?; + + batches.to_pyarrow(py) + } +} diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 5ff07ed9e8..30d7f21d7f 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -9,6 +9,7 @@ from deltalake._util import encode_partition_value from deltalake.exceptions import DeltaProtocolError +from deltalake.query import QueryBuilder from deltalake.table import ProtocolVersions from deltalake.writer import write_deltalake @@ -946,3 +947,56 @@ def test_is_deltatable_with_storage_opts(): "DELTA_DYNAMO_TABLE_NAME": "custom_table_name", } assert DeltaTable.is_deltatable(table_path, storage_options=storage_options) + + +def test_read_query_builder(): + table_path = "../crates/test/tests/data/delta-0.8.0-partitioned" + dt = DeltaTable(table_path) + expected = { + "value": ["4", "5", "6", "7"], + "year": ["2021", "2021", "2021", "2021"], + "month": ["4", "12", "12", "12"], + "day": ["5", "4", "20", "20"], + } + actual = pa.Table.from_batches( + QueryBuilder() + .register("tbl", dt) + .execute("SELECT * FROM tbl WHERE year >= 2021 ORDER BY value") + ).to_pydict() + assert expected == actual + + +def test_read_query_builder_join_multiple_tables(tmp_path): + table_path = "../crates/test/tests/data/delta-0.8.0-date" + dt1 = DeltaTable(table_path) + + write_deltalake( + tmp_path, + pa.table( + { + "date": ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"], + "value": ["a", "b", "c", "d"], + } + ), + ) + dt2 = DeltaTable(tmp_path) + + expected = { + "date": ["2021-01-01", "2021-01-02", "2021-01-03"], + "dayOfYear": [1, 2, 3], + "value": ["a", "b", "c"], + } + actual = pa.Table.from_batches( + QueryBuilder() + .register("tbl1", dt1) + .register("tbl2", dt2) + .execute( + """ + SELECT tbl2.date, tbl1.dayOfYear, tbl2.value + FROM tbl1 + INNER JOIN tbl2 ON tbl1.date = tbl2.date + ORDER BY tbl1.date + """ + ) + ).to_pydict() + assert expected == actual From ef0252dcea7dbbb8ee131160be778e8008f9196e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 29 Nov 2024 17:49:25 +0000 Subject: [PATCH 04/17] chore: add some more documentation to the new QueryBuilder interface Signed-off-by: R. Tyler Croy --- python/deltalake/query.py | 37 +++++++++++++++++++++++++++++++++++-- python/src/filesystem.rs | 2 +- python/src/query.rs | 25 ++++++++++++++++++++----- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/python/deltalake/query.py b/python/deltalake/query.py index 06e5144d24..b3e4100d8c 100644 --- a/python/deltalake/query.py +++ b/python/deltalake/query.py @@ -11,6 +11,14 @@ class QueryBuilder: + """ + QueryBuilder is an experimental API which exposes Apache DataFusion SQL to Python users of the deltalake library. + + This API is subject to change. + + >>> qb = QueryBuilder() + """ + def __init__(self) -> None: warnings.warn( "QueryBuilder is experimental and subject to change", @@ -19,7 +27,20 @@ def __init__(self) -> None: self._query_builder = PyQueryBuilder() def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: - """Add a table to the query builder.""" + """ + Add a table to the query builder instance by name. The `table_name` + will be how the referenced `DeltaTable` can be referenced in SQL + queries. + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> assert qb is not None + """ self._query_builder.register( table_name=table_name, delta_table=delta_table._table, @@ -27,5 +48,17 @@ def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: return self def execute(self, sql: str) -> List[pyarrow.RecordBatch]: - """Execute the query and return a list of record batches.""" + """ + Execute the query and return a list of record batches + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> results = qb.execute('SELECT * FROM test') + >>> assert results is not None + """ return self._query_builder.execute(sql) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 116b1b0cf1..ee5261ab09 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -13,7 +13,7 @@ use std::sync::Arc; const DEFAULT_MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub(crate) struct FsConfig { pub(crate) root_url: String, pub(crate) options: HashMap, diff --git a/python/src/query.rs b/python/src/query.rs index af3da38eee..55889c567f 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -9,9 +9,14 @@ use pyo3::prelude::*; use crate::{error::PythonError, utils::rt, RawDeltaTable}; +/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users +/// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already +/// present in the Python package. #[pyclass(module = "deltalake._internal")] +#[derive(Default)] pub(crate) struct PyQueryBuilder { - _ctx: SessionContext, + /// DataFusion [SessionContext] to hold mappings of registered tables + ctx: SessionContext, } #[pymethods] @@ -19,11 +24,16 @@ impl PyQueryBuilder { #[new] pub fn new() -> Self { let config = DeltaSessionConfig::default().into(); - let _ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new_with_config(config); - PyQueryBuilder { _ctx } + PyQueryBuilder { ctx } } + /// Register the given [RawDeltaTable] into the [SessionContext] using the provided + /// `table_name` + /// + /// Once called, the provided `delta_table` will be referencable in SQL queries so long as + /// another table of the same name is not registered over it. pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> { let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?; let log_store = delta_table._table.log_store(); @@ -37,17 +47,22 @@ impl PyQueryBuilder { .map_err(PythonError::from)?, ); - self._ctx + self.ctx .register_table(table_name, provider) .map_err(PythonError::from)?; Ok(()) } + /// Execute the given SQL command within the [SessionContext] of this instance + /// + /// **NOTE:** Since this function returns a materialized Python list of `RecordBatch` + /// instances, it may result unexpected memory consumption for queries which return large data + /// sets. pub fn execute(&self, py: Python, sql: &str) -> PyResult { let batches = py.allow_threads(|| { rt().block_on(async { - let df = self._ctx.sql(sql).await?; + let df = self.ctx.sql(sql).await?; df.collect().await }) .map_err(PythonError::from) From e83edcf500db54bacf613731bb42179ef5cafaf2 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 30 Nov 2024 16:58:25 +0000 Subject: [PATCH 05/17] chore: bump for another python and core release Signed-off-by: R. Tyler Croy --- crates/core/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 948139dcc1..24f4ac8777 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.1" +version = "0.22.2" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index c89f68b8c0..fba55dcd31 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.1" +version = "0.22.2" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" From ba671c991c6c82b3bc066d8cb6aac9d653b4f728 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 30 Nov 2024 18:05:16 +0000 Subject: [PATCH 06/17] chore: missed a version update! Signed-off-by: R. Tyler Croy --- crates/deltalake/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 3c5a13172e..d7fdb50184 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.1" +version = "0.22.2" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,7 +16,7 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.22.2", path = "../core" } deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true } deltalake-azure = { version = "0.5.0", path = "../azure", optional = true } deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true } From 9a9cfca4f290955ca96d87cf2090d173bc258141 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 3 Dec 2024 04:46:42 +0000 Subject: [PATCH 07/17] chore: add more robust regression tests for the checkpoint related failure(s) See #3030 Signed-off-by: R. Tyler Croy --- crates/core/src/protocol/checkpoints.rs | 26 ++++++++++++++++++++++--- python/tests/test_checkpoint.py | 25 ++++++++++++------------ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 42ab5355b7..bf9cdf1fea 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -1163,10 +1163,16 @@ mod tests { } /// + #[cfg(feature = "datafusion")] #[tokio::test] async fn test_create_checkpoint_overwrite() -> DeltaResult<()> { use crate::protocol::SaveMode; + use crate::writer::test_utils::datafusion::get_data_sorted; use crate::writer::test_utils::get_arrow_schema; + use datafusion::assert_batches_sorted_eq; + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1177,13 +1183,15 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default()) + + let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) .await? .write(vec![batch]) .await?; + table.load().await?; assert_eq!(table.version(), 0); - create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?; + create_checkpoint(&table).await?; let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1194,11 +1202,23 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps(table) + + let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) + .await? .write(vec![batch]) .with_save_mode(SaveMode::Overwrite) .await?; assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "+----+-------+------------+", + ]; + let actual = get_data_sorted(&table, "id,value,modified").await; + assert_batches_sorted_eq!(&expected, &actual); Ok(()) } } diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 5961a57b09..309a1f3663 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -483,18 +483,19 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): } ), ) - DeltaTable(tmp_path).create_checkpoint() + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + assert dt.version() == 0 + df = pd.DataFrame( + { + "a": ["a"], + "b": [100], + } + ) + write_deltalake(tmp_path, df, mode="overwrite") dt = DeltaTable(tmp_path) + assert dt.version() == 1 + new_df = dt.to_pandas() print(dt.to_pandas()) - - write_deltalake( - tmp_path, - pd.DataFrame( - { - "a": ["a"], - "b": [100], - } - ), - mode="overwrite", - ) + assert len(new_df) == 1, "We overwrote! there should only be one row" From 0d6a8a95e28862112b8e66c6a5824ad67382d1cc Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 4 Dec 2024 15:25:42 +0000 Subject: [PATCH 08/17] fix: prevent attempting to read empty DVInfo The root cause remains elusive but with the recent arrow upgrade the nullable deletionVector struct, which contains non-nullable fields, is being read out as "empty" in the default case rather than as null. This causes the "seen" code in log replay to fail to correctly identify files which should be marked as removed when computing the state of the snapshot. This change introduces another workaround that I am still not thrilled about :unamused: Fixes #3030 Signed-off-by: R. Tyler Croy --- Cargo.toml | 2 +- crates/core/src/kernel/snapshot/replay.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a48f8c7894..9fe8c44bb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.4.1", features = ["sync-engine"] } +delta_kernel = { version = "0.4.1", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 1b18b61bc7..316ac6dbba 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -20,7 +20,7 @@ use hashbrown::HashSet; use itertools::Itertools; use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; -use tracing::debug; +use tracing::log::*; use super::parse::collect_map; use super::ReplayVisitor; @@ -440,6 +440,14 @@ pub(super) struct DVInfo<'a> { fn seen_key(info: &FileInfo<'_>) -> String { let path = percent_decode_str(info.path).decode_utf8_lossy(); if let Some(dv) = &info.dv { + // If storage_type is empty then delta-rs has somehow gotten an empty rather than a null + // deletion vector, oooof + // + // See #3030 + if dv.storage_type.is_empty() { + warn!("An empty but not nullable deletionVector was seen for {info:?}"); + return path.to_string(); + } if let Some(offset) = &dv.offset { format!( "{}::{}{}@{offset}", From 0eca716f56509673ce35f251fccdcca213ced9cc Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 4 Dec 2024 20:51:10 +0100 Subject: [PATCH 09/17] fix: add null checks when working with structs Signed-off-by: R. Tyler Croy --- crates/core/src/kernel/snapshot/log_data.rs | 50 ++++++------ crates/core/src/kernel/snapshot/parse.rs | 90 +++++++++++++-------- crates/core/src/kernel/snapshot/replay.rs | 33 +++++--- 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 562ca3da90..f22f88ad23 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,23 +245,14 @@ impl LogicalFile<'_> { /// Defines a deletion vector pub fn deletion_vector(&self) -> Option> { - if let Some(arr) = self.deletion_vector.as_ref() { - // With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with - // non-nullable entries back out of parquet is resulting in the DeletionVector having - // an empty string rather than a null. The addition check on the value ensures that a - // [DeletionVectorView] is not created in this scenario - // - // - if arr.storage_type.is_valid(self.index) - && !arr.storage_type.value(self.index).is_empty() - { - return Some(DeletionVectorView { + self.deletion_vector.as_ref().and_then(|arr| { + arr.storage_type + .is_valid(self.index) + .then_some(DeletionVectorView { data: arr, index: self.index, - }); - } - } - None + }) + }) } /// The number of records stored in the data file. @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> { ); let deletion_vector = extract_and_cast_opt::(data, "add.deletionVector"); let deletion_vector = deletion_vector.and_then(|dv| { - let storage_type = extract_and_cast::(dv, "storageType").ok()?; - let path_or_inline_dv = extract_and_cast::(dv, "pathOrInlineDv").ok()?; - let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; - let cardinality = extract_and_cast::(dv, "cardinality").ok()?; - let offset = extract_and_cast_opt::(dv, "offset"); - Some(DeletionVector { - storage_type, - path_or_inline_dv, - size_in_bytes, - cardinality, - offset, - }) + if dv.null_count() == dv.len() { + None + } else { + let storage_type = extract_and_cast::(dv, "storageType").ok()?; + let path_or_inline_dv = + extract_and_cast::(dv, "pathOrInlineDv").ok()?; + let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; + let cardinality = extract_and_cast::(dv, "cardinality").ok()?; + let offset = extract_and_cast_opt::(dv, "offset"); + Some(DeletionVector { + storage_type, + path_or_inline_dv, + size_in_bytes, + cardinality, + offset, + }) + } }); Ok(Self { diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs index f75744691e..e8630cbe0c 100644 --- a/crates/core/src/kernel/snapshot/parse.rs +++ b/crates/core/src/kernel/snapshot/parse.rs @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(array, "add") { + // Stop early if all values are null + if arr.null_count() == arr.len() { + return Ok(vec![]); + } let path = ex::extract_and_cast::(arr, "path")?; let pvs = ex::extract_and_cast_opt::(arr, "partitionValues"); let size = ex::extract_and_cast::(arr, "size")?; @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 316ac6dbba..6267a7f3be 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -559,22 +559,32 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult(d, "pathOrInlineDv")?; let offset = ex::extract_and_cast::(d, "offset")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Ok(Some(DVInfo { - storage_type: ex::read_str(storage_type, idx)?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, - offset: ex::read_primitive_opt(offset, idx), - })) - } else { - Ok(None) - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| Ok(None)) + } else { + Box::new(|idx: usize| { + if d.is_valid(idx) { + if ex::read_str(storage_type, idx).is_ok() { + Ok(Some(DVInfo { + storage_type: ex::read_str(storage_type, idx)?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, + offset: ex::read_primitive_opt(offset, idx), + })) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + } } else { Box::new(|_| Ok(None)) }; let mut adds = Vec::with_capacity(path.len()); + for idx in 0..path.len() { let value = path .is_valid(idx) @@ -587,6 +597,7 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult Date: Wed, 4 Dec 2024 19:56:22 +0000 Subject: [PATCH 10/17] chore: bump to 0.22.3 for another release Signed-off-by: R. Tyler Croy --- crates/core/Cargo.toml | 2 +- crates/deltalake/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 24f4ac8777..57a9496070 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index d7fdb50184..476f0b5d60 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index fba55dcd31..bb6fbba621 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.2" +version = "0.22.3" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" From 85344dc0acbbf202307025a6b479b26a7e12a804 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 02:56:53 +0000 Subject: [PATCH 11/17] Added enable out of range to load_cdf Signed-off-by: Pablo Cabeza --- crates/core/src/errors.rs | 4 + crates/core/src/operations/load_cdf.rs | 114 ++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/crates/core/src/errors.rs b/crates/core/src/errors.rs index 609bc16656..e3447cad72 100644 --- a/crates/core/src/errors.rs +++ b/crates/core/src/errors.rs @@ -1,4 +1,5 @@ //! Exceptions for the deltalake crate +use chrono::{DateTime, Utc}; use object_store::Error as ObjectStoreError; use crate::operations::transaction::{CommitBuilderError, TransactionError}; @@ -232,6 +233,9 @@ pub enum DeltaTableError { #[error("Invalid version start version {start} is greater than version {end}")] ChangeDataInvalidVersionRange { start: i64, end: i64 }, + + #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")] + ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime }, } impl From for DeltaTableError { diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index ad2986de80..e6cdb43bf0 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -43,6 +43,8 @@ pub struct CdfLoadBuilder { starting_timestamp: Option>, /// Ending timestamp of commits to accept ending_timestamp: Option>, + /// Enable ending version or timestamp exceeding the last commit + enable_out_of_range: bool, /// Provided Datafusion context ctx: SessionContext, } @@ -58,6 +60,7 @@ impl CdfLoadBuilder { ending_version: None, starting_timestamp: None, ending_timestamp: None, + enable_out_of_range: false, ctx: SessionContext::new(), } } @@ -92,6 +95,12 @@ impl CdfLoadBuilder { self } + /// Enable ending version or timestamp exceeding the last commit + pub fn with_out_of_range(mut self) -> Self { + self.enable_out_of_range = true; + self + } + /// Columns to select pub fn with_columns(mut self, columns: Vec) -> Self { self.columns = Some(columns); @@ -110,9 +119,12 @@ impl CdfLoadBuilder { Vec>, )> { let start = self.starting_version; - let end = self + let latest_version = self.log_store.get_latest_version(start).await?; + let mut end = self .ending_version - .unwrap_or(self.log_store.get_latest_version(start).await?); + .unwrap_or(latest_version); + + if end > latest_version { end = latest_version; } if end < start { return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); @@ -139,8 +151,13 @@ impl CdfLoadBuilder { .log_store .read_commit_entry(version) .await? - .ok_or(DeltaTableError::InvalidVersion(version))?; - let version_actions = get_actions(version, snapshot_bytes).await?; + .ok_or(DeltaTableError::InvalidVersion(version)); + + if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range { + break; + } + + let version_actions: Vec = get_actions(version, snapshot_bytes?).await?; let mut ts = 0; let mut cdc_actions = vec![]; @@ -236,6 +253,11 @@ impl CdfLoadBuilder { } } + // All versions were skipped due to date our of range + if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() { + return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp }); + } + Ok((change_files, add_files, remove_files)) } @@ -578,6 +600,90 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_load_version_out_of_range() -> TestResult { + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_version(5) + .build() + .await; + + assert!(table.is_err()); + assert!(matches!( + table.unwrap_err(), + DeltaTableError::InvalidVersion { .. } + )); + + Ok(()) + } + + #[tokio::test] + async fn test_load_version_out_of_range_with_flag() -> TestResult { + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_version(5) + .with_out_of_range() + .build() + .await?; + + let ctx = SessionContext::new(); + let batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table.clone(), + ctx, + ) + .await?; + + assert!(batches.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_load_timestamp_out_of_range() -> TestResult { + let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap(); + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_timestamp(ending_timestamp.and_utc()) + .build() + .await; + + assert!(table.is_err()); + assert!(matches!( + table.unwrap_err(), + DeltaTableError::ChangeDataTimestampGreaterThanCommit { .. } + )); + + Ok(()) + } + + #[tokio::test] + async fn test_load_timestamp_out_of_range_with_flag() -> TestResult { + let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap(); + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_timestamp(ending_timestamp.and_utc()) + .with_out_of_range() + .build() + .await?; + + let ctx = SessionContext::new(); + let batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table.clone(), + ctx, + ) + .await?; + + assert!(batches.is_empty()); + + Ok(()) + } + #[tokio::test] async fn test_load_non_cdf() -> TestResult { let table = DeltaOps::try_from_uri("../test/tests/data/simple_table") From d859ad3587d33ddbf233f476f90b52eb343d260c Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 03:39:35 +0000 Subject: [PATCH 12/17] Added enable out of range to python Signed-off-by: Pablo Cabeza --- python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 2 ++ python/src/lib.rs | 7 +++++- python/tests/test_cdf.py | 39 +++++++++++++++++++++++++++++++++- 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 052cf1ebb6..ab2f04eb0d 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -220,6 +220,7 @@ class RawDeltaTable: ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, + enable_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 247a2b9527..7f3f246780 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -689,6 +689,7 @@ def load_cdf( starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, columns: Optional[List[str]] = None, + enable_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: return self._table.load_cdf( columns=columns, @@ -696,6 +697,7 @@ def load_cdf( ending_version=ending_version, starting_timestamp=starting_timestamp, ending_timestamp=ending_timestamp, + enable_out_of_range=enable_out_of_range, ) @property diff --git a/python/src/lib.rs b/python/src/lib.rs index c4a4d80b78..97aeeec005 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -675,7 +675,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None))] + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))] pub fn load_cdf( &mut self, py: Python, @@ -684,6 +684,7 @@ impl RawDeltaTable { starting_timestamp: Option, ending_timestamp: Option, columns: Option>, + enable_out_of_range: bool, ) -> PyResult> { let ctx = SessionContext::new(); let mut cdf_read = CdfLoadBuilder::new( @@ -708,6 +709,10 @@ impl RawDeltaTable { cdf_read = cdf_read.with_ending_timestamp(ending_ts); } + if enable_out_of_range { + cdf_read = cdf_read.with_out_of_range(); + } + if let Some(columns) = columns { cdf_read = cdf_read.with_columns(columns); } diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 36d94c9f99..de512cc99b 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -6,7 +6,7 @@ import pyarrow.dataset as ds import pyarrow.parquet as pq -from deltalake import DeltaTable, write_deltalake +from deltalake import DeltaTable, _internal, write_deltalake def test_read_cdf_partitioned(): @@ -677,3 +677,40 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table): ).sort_by(sort_values).select(expected_data.column_names) == pa.concat_tables( [first_batch, expected_data] ).sort_by(sort_values) + + +def test_read_cdf_version_out_of_range(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + + try: + b = dt.load_cdf(4).read_all().to_pydict() + assert False, "Should not get here" + except _internal.DeltaError as e: + assert "invalid table version" in str(e).lower() + + +def test_read_cdf_version_out_of_range_with_flag(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + b = dt.load_cdf(4, enable_out_of_range=True).read_all() + + assert len(b) == 0 + + +def test_read_timestamp_cdf_out_of_range(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + start = "2033-12-22T17:10:21.675Z" + + try: + b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict() + assert False, "Should not get here" + except _internal.DeltaError as e: + assert "is greater than latest commit timestamp" in str(e).lower() + + +def test_read_timestamp_cdf_out_of_range_with_flag(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + + start = "2033-12-22T17:10:21.675Z" + b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all() + + assert len(b) == 0 \ No newline at end of file From a79c576139ed4b1812e3ba0405c22d5bcb792e04 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 16:26:47 +0000 Subject: [PATCH 13/17] Fixed fomatting and test in python plus renamed out of range flag for clarity Signed-off-by: Pablo Cabeza --- crates/core/src/operations/load_cdf.rs | 38 +++++++++++++++----------- python/deltalake/_internal.pyi | 2 +- python/deltalake/table.py | 4 +-- python/src/lib.rs | 8 +++--- python/tests/test_cdf.py | 28 +++++++++---------- 5 files changed, 43 insertions(+), 37 deletions(-) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index e6cdb43bf0..f1c0418de4 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -44,7 +44,7 @@ pub struct CdfLoadBuilder { /// Ending timestamp of commits to accept ending_timestamp: Option>, /// Enable ending version or timestamp exceeding the last commit - enable_out_of_range: bool, + allow_out_of_range: bool, /// Provided Datafusion context ctx: SessionContext, } @@ -60,7 +60,7 @@ impl CdfLoadBuilder { ending_version: None, starting_timestamp: None, ending_timestamp: None, - enable_out_of_range: false, + allow_out_of_range: false, ctx: SessionContext::new(), } } @@ -96,8 +96,8 @@ impl CdfLoadBuilder { } /// Enable ending version or timestamp exceeding the last commit - pub fn with_out_of_range(mut self) -> Self { - self.enable_out_of_range = true; + pub fn with_allow_out_of_range(mut self) -> Self { + self.allow_out_of_range = true; self } @@ -120,11 +120,11 @@ impl CdfLoadBuilder { )> { let start = self.starting_version; let latest_version = self.log_store.get_latest_version(start).await?; - let mut end = self - .ending_version - .unwrap_or(latest_version); + let mut end = self.ending_version.unwrap_or(latest_version); - if end > latest_version { end = latest_version; } + if end > latest_version { + end = latest_version; + } if end < start { return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); @@ -153,7 +153,7 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range { + if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range { break; } @@ -254,8 +254,14 @@ impl CdfLoadBuilder { } // All versions were skipped due to date our of range - if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() { - return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp }); + if !self.allow_out_of_range + && change_files.is_empty() + && add_files.is_empty() + && remove_files.is_empty() + { + return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { + ending_timestamp: ending_timestamp, + }); } Ok((change_files, add_files, remove_files)) @@ -608,7 +614,7 @@ pub(crate) mod tests { .with_starting_version(5) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -624,7 +630,7 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_version(5) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; @@ -650,7 +656,7 @@ pub(crate) mod tests { .with_starting_timestamp(ending_timestamp.and_utc()) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -667,10 +673,10 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_timestamp(ending_timestamp.and_utc()) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; - + let ctx = SessionContext::new(); let batches = collect_batches( table.properties().output_partitioning().partition_count(), diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ab2f04eb0d..f19c685118 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -220,7 +220,7 @@ class RawDeltaTable: ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 7f3f246780..e8fc7d866b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -689,7 +689,7 @@ def load_cdf( starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, columns: Optional[List[str]] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: return self._table.load_cdf( columns=columns, @@ -697,7 +697,7 @@ def load_cdf( ending_version=ending_version, starting_timestamp=starting_timestamp, ending_timestamp=ending_timestamp, - enable_out_of_range=enable_out_of_range, + allow_out_of_range=allow_out_of_range, ) @property diff --git a/python/src/lib.rs b/python/src/lib.rs index 97aeeec005..cfd27bbdfa 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -675,7 +675,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))] + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))] pub fn load_cdf( &mut self, py: Python, @@ -684,7 +684,7 @@ impl RawDeltaTable { starting_timestamp: Option, ending_timestamp: Option, columns: Option>, - enable_out_of_range: bool, + allow_out_of_range: bool, ) -> PyResult> { let ctx = SessionContext::new(); let mut cdf_read = CdfLoadBuilder::new( @@ -709,8 +709,8 @@ impl RawDeltaTable { cdf_read = cdf_read.with_ending_timestamp(ending_ts); } - if enable_out_of_range { - cdf_read = cdf_read.with_out_of_range(); + if allow_out_of_range { + cdf_read = cdf_read.with_allow_out_of_range(); } if let Some(columns) = columns { diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index de512cc99b..3dcdd457fb 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -5,8 +5,10 @@ import pyarrow.compute as pc import pyarrow.dataset as ds import pyarrow.parquet as pq +import pytest -from deltalake import DeltaTable, _internal, write_deltalake +from deltalake import DeltaTable, write_deltalake +from deltalake.exceptions import DeltaError def test_read_cdf_partitioned(): @@ -682,16 +684,15 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table): def test_read_cdf_version_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - try: - b = dt.load_cdf(4).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "invalid table version" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(4).read_all().to_pydict() + + assert "invalid table version" in str(e).lower() def test_read_cdf_version_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - b = dt.load_cdf(4, enable_out_of_range=True).read_all() + b = dt.load_cdf(4, allow_out_of_range=True).read_all() assert len(b) == 0 @@ -700,17 +701,16 @@ def test_read_timestamp_cdf_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - try: - b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "is greater than latest commit timestamp" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(starting_timestamp=start).read_all().to_pydict() + + assert "is greater than latest commit timestamp" in str(e).lower() def test_read_timestamp_cdf_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all() + b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all() - assert len(b) == 0 \ No newline at end of file + assert len(b) == 0 From 2501f2fd2c54a6d7d0d2d5793f82a769c737d6a7 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 19:04:38 +0000 Subject: [PATCH 14/17] Moved all boundary conditions before the main load_cdf loop Signed-off-by: Pablo Cabeza --- crates/core/src/operations/load_cdf.rs | 68 ++++++++++++++++++-------- 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index f1c0418de4..a63b5182b2 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -119,15 +119,31 @@ impl CdfLoadBuilder { Vec>, )> { let start = self.starting_version; - let latest_version = self.log_store.get_latest_version(start).await?; + let latest_version = self.log_store.get_latest_version(0).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit let mut end = self.ending_version.unwrap_or(latest_version); + let mut change_files: Vec> = vec![]; + let mut add_files: Vec> = vec![]; + let mut remove_files: Vec> = vec![]; + if end > latest_version { end = latest_version; } + if start > latest_version { + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::InvalidVersion(start)) + }; + } + if end < start { - return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }) + }; } let starting_timestamp = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH); @@ -135,6 +151,35 @@ impl CdfLoadBuilder { .ending_timestamp .unwrap_or(DateTime::from(SystemTime::now())); + // Check that starting_timestmp is within boundaries of the latest version + let latest_snapshot_bytes = self + .log_store + .read_commit_entry(latest_version) + .await? + .ok_or(DeltaTableError::InvalidVersion(latest_version)); + + let latest_version_actions: Vec = + get_actions(latest_version, latest_snapshot_bytes?).await?; + let latest_version_commit = latest_version_actions + .iter() + .find(|a| matches!(a, Action::CommitInfo(_))); + + if let Some(Action::CommitInfo(CommitInfo { + timestamp: Some(latest_timestamp), + .. + })) = latest_version_commit + { + if starting_timestamp.timestamp_millis() > *latest_timestamp { + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { + ending_timestamp: ending_timestamp, + }) + }; + } + } + log::debug!( "starting timestamp = {:?}, ending timestamp = {:?}", &starting_timestamp, @@ -142,10 +187,6 @@ impl CdfLoadBuilder { ); log::debug!("starting version = {}, ending version = {:?}", start, end); - let mut change_files: Vec> = vec![]; - let mut add_files: Vec> = vec![]; - let mut remove_files: Vec> = vec![]; - for version in start..=end { let snapshot_bytes = self .log_store @@ -153,10 +194,6 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range { - break; - } - let version_actions: Vec = get_actions(version, snapshot_bytes?).await?; let mut ts = 0; @@ -253,17 +290,6 @@ impl CdfLoadBuilder { } } - // All versions were skipped due to date our of range - if !self.allow_out_of_range - && change_files.is_empty() - && add_files.is_empty() - && remove_files.is_empty() - { - return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { - ending_timestamp: ending_timestamp, - }); - } - Ok((change_files, add_files, remove_files)) } From 2946e5b36d2965016be24ac52d8ed5d84c32229e Mon Sep 17 00:00:00 2001 From: Igor Vitenko Date: Mon, 9 Dec 2024 20:44:04 -0500 Subject: [PATCH 15/17] add explicit type casts while merge Signed-off-by: Igor Vitenko --- crates/core/src/operations/merge/mod.rs | 8 ++++--- python/tests/test_merge.py | 29 +++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 6be8c264ba..59bd28e400 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -44,7 +44,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::{ execution::context::SessionState, physical_plan::ExecutionPlan, - prelude::{DataFrame, SessionContext}, + prelude::{cast, DataFrame, SessionContext}, }; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Column, DFSchema, ScalarValue, TableReference}; @@ -990,8 +990,10 @@ async fn execute( .end()?; let name = "__delta_rs_c_".to_owned() + delta_field.name(); - write_projection - .push(Expr::Column(Column::from_name(name.clone())).alias(delta_field.name())); + write_projection.push(cast( + Expr::Column(Column::from_name(name.clone())).alias(delta_field.name()), + delta_field.data_type().try_into()?, + )); new_columns.push((name, case)); } diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index e8416f6e5f..a5ecb0ecc7 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -3,6 +3,7 @@ import pathlib import pyarrow as pa +import pyarrow.parquet as pq import pytest from deltalake import DeltaTable, write_deltalake @@ -1120,3 +1121,31 @@ def test_merge_non_nullable(tmp_path): target_alias="t", predicate="s.id = t.id", ).when_matched_update_all().when_not_matched_insert_all().execute() + + +def test_merge_when_wrong_but_castable_type_passed_while_merge( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append") + + dt = DeltaTable(tmp_path) + + source_table = pa.table( + { + "id": pa.array(["7", "8"]), + "price": pa.array(["1", "2"], pa.string()), + "sold": pa.array([1, 2], pa.int32()), + "deleted": pa.array([False, False]), + } + ) + dt.merge( + source=source_table, + predicate="t.id = s.id", + source_alias="s", + target_alias="t", + ).when_not_matched_insert_all().execute() + + table_schema = pq.read_table( + tmp_path / dt.get_add_actions().to_pandas()["path"].iloc[0] + ).schema + assert table_schema.field("price").type == sample_table["price"].type From e665ca8fae73bc00dfa661735b74cce6c0d3aaca Mon Sep 17 00:00:00 2001 From: Igor Vitenko Date: Mon, 9 Dec 2024 21:13:56 -0500 Subject: [PATCH 16/17] fix No module named pandas Signed-off-by: Igor Vitenko --- python/tests/test_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index a5ecb0ecc7..1f81e81142 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -1146,6 +1146,6 @@ def test_merge_when_wrong_but_castable_type_passed_while_merge( ).when_not_matched_insert_all().execute() table_schema = pq.read_table( - tmp_path / dt.get_add_actions().to_pandas()["path"].iloc[0] + tmp_path / dt.get_add_actions().column(0)[0].as_py() ).schema assert table_schema.field("price").type == sample_table["price"].type From 89f5f72b96abfb199bff4e09e20f8430d2ae998a Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 11 Dec 2024 09:07:42 +0100 Subject: [PATCH 17/17] chore: bump delta-kernel to 0.5.0 Signed-off-by: Robert Pack --- Cargo.toml | 2 +- crates/aws/src/lib.rs | 5 +- crates/core/src/delta_datafusion/expr.rs | 8 +-- crates/core/src/kernel/models/actions.rs | 8 +-- crates/core/src/kernel/snapshot/log_data.rs | 66 +++++++++---------- crates/core/src/kernel/snapshot/mod.rs | 6 +- crates/core/src/kernel/snapshot/replay.rs | 12 ++-- crates/core/src/operations/optimize.rs | 2 +- crates/core/src/operations/transaction/mod.rs | 2 +- .../core/src/operations/transaction/state.rs | 2 +- crates/core/src/operations/update.rs | 2 +- crates/core/src/operations/write.rs | 2 +- crates/core/src/protocol/checkpoints.rs | 4 +- crates/core/src/protocol/mod.rs | 26 ++++---- crates/core/src/schema/partitions.rs | 2 +- crates/core/src/table/config.rs | 4 +- crates/core/src/table/state_arrow.rs | 3 +- .../core/src/test_utils/factories/actions.rs | 2 +- crates/core/src/writer/json.rs | 4 +- crates/core/src/writer/record_batch.rs | 8 +-- crates/core/src/writer/stats.rs | 26 ++++---- python/src/lib.rs | 10 +-- 22 files changed, 99 insertions(+), 107 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9fe8c44bb3..e1832c2349 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.4.1", features = ["default-engine"] } +delta_kernel = { version = "0.5.0", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 69ac891838..ee7f222701 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -727,11 +727,10 @@ fn extract_version_from_filename(name: &str) -> Option { #[cfg(test)] mod tests { use super::*; - use aws_sdk_sts::config::{ProvideCredentials, ResolveCachedIdentity}; - use futures::future::Shared; + use aws_sdk_sts::config::ProvideCredentials; + use object_store::memory::InMemory; use serial_test::serial; - use tracing::instrument::WithSubscriber; fn commit_entry_roundtrip(c: &CommitEntry) -> Result<(), LockClientError> { let item_data: HashMap = create_value_map(c, "some_table"); diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index a421400791..c0e79ba490 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -217,7 +217,7 @@ impl<'a> DeltaContextProvider<'a> { } } -impl<'a> ContextProvider for DeltaContextProvider<'a> { +impl ContextProvider for DeltaContextProvider<'_> { fn get_table_source(&self, _name: TableReference) -> DFResult> { unimplemented!() } @@ -304,7 +304,7 @@ struct BinaryExprFormat<'a> { expr: &'a BinaryExpr, } -impl<'a> Display for BinaryExprFormat<'a> { +impl Display for BinaryExprFormat<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // Put parentheses around child binary expressions so that we can see the difference // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed, @@ -333,7 +333,7 @@ impl<'a> Display for BinaryExprFormat<'a> { } } -impl<'a> Display for SqlFormat<'a> { +impl Display for SqlFormat<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.expr { Expr::Column(c) => write!(f, "{c}"), @@ -488,7 +488,7 @@ struct ScalarValueFormat<'a> { scalar: &'a ScalarValue, } -impl<'a> fmt::Display for ScalarValueFormat<'a> { +impl fmt::Display for ScalarValueFormat<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.scalar { ScalarValue::Boolean(e) => format_option!(f, e)?, diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 4341ff5324..ef370b4956 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet}; -use std::fmt; +use std::fmt::{self, Display}; use std::str::FromStr; use maplit::hashset; @@ -726,9 +726,9 @@ impl AsRef for StorageType { } } -impl ToString for StorageType { - fn to_string(&self) -> String { - self.as_ref().into() +impl Display for StorageType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) } } diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index f22f88ad23..05d1790dc9 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -79,7 +79,7 @@ pub struct DeletionVectorView<'a> { index: usize, } -impl<'a> DeletionVectorView<'a> { +impl DeletionVectorView<'_> { /// get a unique idenitfier for the deletion vector pub fn unique_id(&self) -> String { if let Some(offset) = self.offset() { @@ -569,32 +569,30 @@ mod datafusion { } match array.data_type() { - ArrowDataType::Struct(fields) => { - return fields - .iter() - .map(|f| { - self.column_bounds( - path_step, - &format!("{name}.{}", f.name()), - fun_type.clone(), - ) - }) - .map(|s| match s { - Precision::Exact(s) => Some(s), - _ => None, - }) - .collect::>>() - .map(|o| { - let arrays = o - .into_iter() - .map(|sv| sv.to_array()) - .collect::, datafusion_common::DataFusionError>>() - .unwrap(); - let sa = StructArray::new(fields.clone(), arrays, None); - Precision::Exact(ScalarValue::Struct(Arc::new(sa))) - }) - .unwrap_or(Precision::Absent); - } + ArrowDataType::Struct(fields) => fields + .iter() + .map(|f| { + self.column_bounds( + path_step, + &format!("{name}.{}", f.name()), + fun_type.clone(), + ) + }) + .map(|s| match s { + Precision::Exact(s) => Some(s), + _ => None, + }) + .collect::>>() + .map(|o| { + let arrays = o + .into_iter() + .map(|sv| sv.to_array()) + .collect::, datafusion_common::DataFusionError>>() + .unwrap(); + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + }) + .unwrap_or(Precision::Absent), _ => Precision::Absent, } } @@ -721,9 +719,9 @@ mod datafusion { return None; } let expression = if self.metadata.partition_columns.contains(&column.name) { - Expression::Column(format!("add.partitionValues_parsed.{}", column.name)) + Expression::column(["add", "partitionValues_parsed", &column.name]) } else { - Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name)) + Expression::column(["add", "stats_parsed", stats_field, &column.name]) }; let evaluator = ARROW_HANDLER.get_evaluator( crate::kernel::models::fields::log_schema_ref().clone(), @@ -735,7 +733,7 @@ mod datafusion { let engine = ArrowEngineData::new(batch.clone()); let result = evaluator.evaluate(&engine).ok()?; let result = result - .as_any() + .any_ref() .downcast_ref::() .ok_or(DeltaTableError::generic( "failed to downcast evaluator result to ArrowEngineData.", @@ -744,11 +742,11 @@ mod datafusion { results.push(result.record_batch().clone()); } let batch = concat_batches(results[0].schema_ref(), &results).ok()?; - batch.column_by_name("output").map(|c| c.clone()) + batch.column_by_name("output").cloned() } } - impl<'a> PruningStatistics for LogDataHandler<'a> { + impl PruningStatistics for LogDataHandler<'_> { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { @@ -799,7 +797,7 @@ mod datafusion { lazy_static::lazy_static! { static ref ROW_COUNTS_EVAL: Arc = ARROW_HANDLER.get_evaluator( crate::kernel::models::fields::log_schema_ref().clone(), - Expression::column("add.stats_parsed.numRecords"), + Expression::column(["add", "stats_parsed","numRecords"]), DataType::Primitive(PrimitiveType::Long), ); } @@ -808,7 +806,7 @@ mod datafusion { let engine = ArrowEngineData::new(batch.clone()); let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?; let result = result - .as_any() + .any_ref() .downcast_ref::() .ok_or(DeltaTableError::generic( "failed to downcast evaluator result to ArrowEngineData.", diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index d5763b5006..25e11b88ca 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -523,7 +523,7 @@ impl EagerSnapshot { /// Get the table config which is loaded with of the snapshot pub fn load_config(&self) -> &DeltaTableConfig { - &self.snapshot.load_config() + self.snapshot.load_config() } /// Well known table configuration @@ -696,7 +696,7 @@ fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult, + partition_columns: &[String], ) -> DeltaResult> { if partition_columns.is_empty() { return Ok(None); @@ -705,7 +705,7 @@ pub(crate) fn partitions_schema( partition_columns .iter() .map(|col| { - schema.field(col).map(|field| field.clone()).ok_or_else(|| { + schema.field(col).cloned().ok_or_else(|| { DeltaTableError::Generic(format!( "Partition column {} not found in schema", col diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 6267a7f3be..540ebdf808 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -54,7 +54,7 @@ impl<'a, S> ReplayStream<'a, S> { visitors: &'a mut Vec>, ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?); - let partitions_schema = snapshot.partitions_schema(None)?.map(|s| Arc::new(s)); + let partitions_schema = snapshot.partitions_schema(None)?.map(Arc::new); let mapper = Arc::new(LogMapper { stats_schema, partitions_schema, @@ -83,9 +83,7 @@ impl LogMapper { ) -> DeltaResult { Ok(Self { stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?), - partitions_schema: snapshot - .partitions_schema(table_schema)? - .map(|s| Arc::new(s)), + partitions_schema: snapshot.partitions_schema(table_schema)?.map(Arc::new), config: snapshot.config.clone(), }) } @@ -368,7 +366,7 @@ fn insert_field(batch: RecordBatch, array: StructArray, name: &str) -> DeltaResu )?) } -impl<'a, S> Stream for ReplayStream<'a, S> +impl Stream for ReplayStream<'_, S> where S: Stream>, { @@ -699,7 +697,7 @@ pub(super) mod tests { assert!(ex::extract_and_cast_opt::(&batch, "add.stats").is_some()); assert!(ex::extract_and_cast_opt::(&batch, "add.stats_parsed").is_none()); - let stats_schema = stats_schema(&schema, table_config)?; + let stats_schema = stats_schema(schema, table_config)?; let new_batch = parse_stats(batch, Arc::new((&stats_schema).try_into()?), &config)?; assert!(ex::extract_and_cast_opt::(&new_batch, "add.stats_parsed").is_some()); @@ -764,7 +762,7 @@ pub(super) mod tests { ex::extract_and_cast_opt::(&batch, "add.partitionValues_parsed").is_none() ); - let partitions_schema = partitions_schema(&schema, &partition_columns)?.unwrap(); + let partitions_schema = partitions_schema(schema, &partition_columns)?.unwrap(); let new_batch = parse_partitions(batch, &partitions_schema)?; assert!( diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 758aaa47bf..fe76a3647d 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -1623,7 +1623,7 @@ pub(super) mod zorder { fn get_bit(&self, bit_i: usize) -> bool; } - impl<'a> RowBitUtil for Row<'a> { + impl RowBitUtil for Row<'_> { /// Get the bit at the given index, or just give false if the index is out of bounds fn get_bit(&self, bit_i: usize) -> bool { let byte_i = bit_i / 8; diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 69027cc4b7..85fba6cfd9 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -533,7 +533,7 @@ pub struct PreparedCommit<'a> { post_commit: Option, } -impl<'a> PreparedCommit<'a> { +impl PreparedCommit<'_> { /// The temporary commit file created pub fn commit_or_bytes(&self) -> &CommitOrBytes { &self.commit_or_bytes diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 56769c8c62..71251ebd87 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -106,7 +106,7 @@ impl<'a> AddContainer<'a> { } } -impl<'a> PruningStatistics for AddContainer<'a> { +impl PruningStatistics for AddContainer<'_> { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 01dcb962b6..89a6cf1473 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -247,7 +247,7 @@ async fn execute( // [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560> let rules: Vec> = state .optimizers() - .into_iter() + .iter() .filter(|rule| { rule.name() != "optimize_projections" && rule.name() != "simplify_expressions" }) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 1801a36353..ac984ae96a 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -1253,7 +1253,7 @@ mod tests { } fn assert_common_write_metrics(write_metrics: WriteMetrics) { - assert!(write_metrics.execution_time_ms > 0); + // assert!(write_metrics.execution_time_ms > 0); assert!(write_metrics.num_added_files > 0); } diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index bf9cdf1fea..3419d80587 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -284,7 +284,9 @@ fn parquet_bytes_from_state( remove.extended_file_metadata = Some(false); } } - let files = state.file_actions_iter().unwrap(); + let files = state + .file_actions_iter() + .map_err(|e| ProtocolError::Generic(e.to_string()))?; // protocol let jsons = std::iter::once(Action::Protocol(Protocol { min_reader_version: state.protocol().min_reader_version, diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index f82f48411a..ebb9e034fe 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -864,6 +864,7 @@ mod tests { use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType}; use arrow::record_batch::RecordBatch; use std::sync::Arc; + fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result { let sort_column = batch.column(batch.schema().column_with_name(column).unwrap().0); let sort_indices = sort_to_indices(sort_column, None, None)?; @@ -881,26 +882,26 @@ mod tests { .collect::>()?; RecordBatch::try_from_iter(sorted_columns) } + #[tokio::test] async fn test_with_partitions() { // test table with partitions let path = "../test/tests/data/delta-0.8.0-null-partition"; let table = crate::open_table(path).await.unwrap(); let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); - let actions = sort_batch_by(&actions, "path").unwrap(); let mut expected_columns: Vec<(&str, ArrayRef)> = vec![ - ("path", Arc::new(array::StringArray::from(vec![ - "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet", - "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet" - ]))), - ("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))), - ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ - 1627990384000, 1627990384000 - ]))), - ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), - ("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), - ]; + ("path", Arc::new(array::StringArray::from(vec![ + "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet", + "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet" + ]))), + ("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))), + ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1627990384000, 1627990384000 + ]))), + ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), + ("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), + ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); @@ -920,6 +921,7 @@ mod tests { assert_eq!(expected, actions); } + #[tokio::test] async fn test_with_deletion_vector() { // test table with partitions diff --git a/crates/core/src/schema/partitions.rs b/crates/core/src/schema/partitions.rs index 23abb3896e..e8891bcee0 100644 --- a/crates/core/src/schema/partitions.rs +++ b/crates/core/src/schema/partitions.rs @@ -383,7 +383,7 @@ mod tests { DeltaTablePartition::try_from(path.as_ref()).unwrap(), DeltaTablePartition { key: "year".into(), - value: Scalar::String(year.into()), + value: Scalar::String(year), } ); diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index f8a223560a..e5e76d0c62 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::{collections::HashMap, str::FromStr}; -use delta_kernel::features::ColumnMappingMode; +use delta_kernel::table_features::ColumnMappingMode; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; @@ -343,7 +343,7 @@ impl TableConfig<'_> { self.0 .get(TableProperty::ColumnMappingMode.as_ref()) .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) - .unwrap_or_default() + .unwrap_or(ColumnMappingMode::None) } /// Return the check constraints on the current table diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index e4a374b763..0258109859 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -14,7 +14,7 @@ use arrow_array::{ use arrow_cast::cast; use arrow_cast::parse::Parser; use arrow_schema::{DataType, Field, Fields, TimeUnit}; -use delta_kernel::features::ColumnMappingMode; +use delta_kernel::table_features::ColumnMappingMode; use itertools::Itertools; use super::state::DeltaTableState; @@ -190,6 +190,7 @@ impl DeltaTableState { }) .collect::, DeltaTableError>>()?, }; + // Append values for action in files { for (name, maybe_value) in action.partition_values.iter() { diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index 1ae264f624..92778f33bf 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -43,7 +43,7 @@ impl ActionFactory { partition_columns: Vec, data_change: bool, ) -> Add { - let partitions_schema = partitions_schema(&schema, &partition_columns).unwrap(); + let partitions_schema = partitions_schema(schema, &partition_columns).unwrap(); let partition_values = if let Some(p_schema) = partitions_schema { let batch = DataFactory::record_batch(&p_schema, 1, &bounds).unwrap(); p_schema diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index abb46ed91e..19b6c6d493 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -769,7 +769,7 @@ mod tests { expected_stats.parse::().unwrap(), add_actions .into_iter() - .nth(0) + .next() .unwrap() .stats .unwrap() @@ -817,7 +817,7 @@ mod tests { expected_stats.parse::().unwrap(), add_actions .into_iter() - .nth(0) + .next() .unwrap() .stats .unwrap() diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 2197d64f5f..a22d6f093a 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -1017,7 +1017,7 @@ mod tests { #[tokio::test] async fn test_write_data_skipping_stats_columns() { let batch = get_record_batch(None, false); - let partition_cols: &[String] = &vec![]; + let partition_cols: &[String] = &[]; let table_schema: StructType = get_delta_schema(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); @@ -1053,7 +1053,7 @@ mod tests { expected_stats.parse::().unwrap(), add_actions .into_iter() - .nth(0) + .next() .unwrap() .stats .unwrap() @@ -1065,7 +1065,7 @@ mod tests { #[tokio::test] async fn test_write_data_skipping_num_indexed_colsn() { let batch = get_record_batch(None, false); - let partition_cols: &[String] = &vec![]; + let partition_cols: &[String] = &[]; let table_schema: StructType = get_delta_schema(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); @@ -1101,7 +1101,7 @@ mod tests { expected_stats.parse::().unwrap(), add_actions .into_iter() - .nth(0) + .next() .unwrap() .stats .unwrap() diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index c09efbf651..4fe448ea76 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -135,7 +135,7 @@ fn stats_from_metadata( let idx_to_iterate = if let Some(stats_cols) = stats_columns { let stats_cols = stats_cols - .into_iter() + .iter() .map(|v| { match sqlparser::parser::Parser::new(&dialect) .try_with_sql(v.as_ref()) @@ -143,13 +143,11 @@ fn stats_from_metadata( .parse_multipart_identifier() { Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")), - Err(e) => { - return Err(DeltaWriterError::DeltaTable( - DeltaTableError::GenericError { - source: Box::new(e), - }, - )) - } + Err(e) => Err(DeltaWriterError::DeltaTable( + DeltaTableError::GenericError { + source: Box::new(e), + }, + )), } }) .collect::, DeltaWriterError>>()?; @@ -347,12 +345,12 @@ impl StatsScalar { let mut val = val / 10.0_f64.powi(*scale); - if val.is_normal() { - if (val.trunc() as i128).to_string().len() > (precision - scale) as usize { - // For normal values with integer parts that get rounded to a number beyond - // the precision - scale range take the next smaller (by magnitude) value - val = f64::from_bits(val.to_bits() - 1); - } + if val.is_normal() + && (val.trunc() as i128).to_string().len() > (precision - scale) as usize + { + // For normal values with integer parts that get rounded to a number beyond + // the precision - scale range take the next smaller (by magnitude) value + val = f64::from_bits(val.to_bits() - 1); } Ok(Self::Decimal(val)) } diff --git a/python/src/lib.rs b/python/src/lib.rs index cfd27bbdfa..0135864c7e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,18 +21,12 @@ use delta_kernel::expressions::Scalar; use delta_kernel::schema::StructField; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; -use deltalake::arrow::pyarrow::ToPyArrow; use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator}; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; -use deltalake::datafusion::datasource::provider_as_source; -use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use deltalake::datafusion::physical_plan::ExecutionPlan; -use deltalake::datafusion::prelude::{DataFrame, SessionContext}; -use deltalake::delta_datafusion::{ - DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig, - DeltaTableProvider, -}; +use deltalake::datafusion::prelude::SessionContext; +use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,