From 7ac917aa074a8c3b0daa6dd6c5228ac1d50cf86b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Aug 2023 16:05:58 -0700 Subject: [PATCH 01/10] some fixes for pyarrow 13 --- python/deltalake/table.py | 15 ++++++++++++++ python/deltalake/writer.py | 3 ++- python/src/lib.rs | 36 ++++++++++++++++++++++++--------- python/tests/test_writer.py | 6 ++++-- rust/src/operations/optimize.rs | 1 - 5 files changed, 47 insertions(+), 14 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 0f7f6ab9d1..73c4a561cf 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -730,3 +730,18 @@ def z_order( ) self.table.update_incremental() return json.loads(metrics) + + +def _cast_to_equal_batch( + batch: pyarrow.RecordBatch, schema: pyarrow.Schema +) -> pyarrow.RecordBatch: + """ + Cast a batch to a schema, if it is already considered equal. + + This is mostly for mapping things like list field names, which arrow-rs + checks when looking at schema equality, but pyarrow does not. + """ + if batch.schema == schema: + return pyarrow.Table.from_batches([batch]).cast(schema).to_batches()[0] + else: + return batch diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 466010ed7f..43f46ce353 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -41,7 +41,7 @@ from ._internal import batch_distinct from ._internal import write_new_deltalake as _write_new_deltalake from .exceptions import DeltaProtocolError, TableNotFoundError -from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable +from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable, _cast_to_equal_batch try: import pandas as pd # noqa: F811 @@ -280,6 +280,7 @@ def check_data_is_aligned_with_partition_filtering( ) def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: + batch = _cast_to_equal_batch(batch, schema) checker.check_batch(batch) if mode == "overwrite" and partition_filters: diff --git a/python/src/lib.rs b/python/src/lib.rs index d8f2f92b83..b49fb5d303 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use arrow::pyarrow::PyArrowType; +use arrow_schema::DataType; use chrono::{DateTime, Duration, FixedOffset, Utc}; use deltalake::action::{ self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats, @@ -632,16 +633,31 @@ fn filestats_to_expression<'py>( let mut expressions: Vec> = Vec::new(); let cast_to_type = |column_name: &String, value: PyObject, schema: &ArrowSchema| { - let column_type = PyArrowType( - schema - .field_with_name(column_name) - .map_err(|_| { - PyValueError::new_err(format!("Column not found in schema: {column_name}")) - })? - .data_type() - .clone(), - ) - .into_py(py); + let column_type = schema + .field_with_name(column_name) + .map_err(|_| { + PyValueError::new_err(format!("Column not found in schema: {column_name}")) + })? + .data_type() + .clone(); + + let value = match column_type { + // Since PyArrow 13.0.0, casting string -> timestamp fails if it ends with "Z" + // and the target type is timezone naive. + DataType::Timestamp(_, _) if value.extract::(py).is_ok() => { + value.call_method1(py, "rstrip", ("Z",))? + } + // PyArrow 13.0.0 lost the ability to cast from string to date32, so + // we have to implement that manually. + DataType::Date32 if value.extract::(py).is_ok() => { + let date = Python::import(py, "datetime")?.getattr("date")?; + let date = date.call_method1("fromisoformat", (value,))?; + date.to_object(py) + } + _ => value, + }; + + let column_type = PyArrowType(column_type).into_py(py); pa.call_method1("scalar", (value,))? .call_method1("cast", (column_type,)) }; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index e45d56539c..68ea401249 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -306,7 +306,9 @@ def test_write_recordbatchreader( tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table ): batches = existing_table.to_pyarrow_dataset().to_batches() - reader = RecordBatchReader.from_batches(sample_data.schema, batches) + reader = RecordBatchReader.from_batches( + existing_table.to_pyarrow_dataset().schema, batches + ) write_deltalake(tmp_path, reader, mode="overwrite") assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data @@ -871,7 +873,7 @@ def comp(): # concurrently, then this will fail. assert data.num_rows == sample_data.num_rows try: - write_deltalake(dt.table_uri, data, mode="overwrite") + write_deltalake(dt.table_uri, sample_data, mode="overwrite") except Exception as e: exception = e diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 7ebd6d2e79..b7983f76ba 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -1294,7 +1294,6 @@ pub(super) mod zorder { assert_eq!(result.null_count(), 0); let data: &BinaryArray = as_generic_binary_array(result.as_ref()); - dbg!(data); assert_eq!(data.value_data().len(), 3 * 16 * 3); assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16)); From 27bb16d1cc104287b11cb102b81825c25d55ca00 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Aug 2023 17:02:22 -0700 Subject: [PATCH 02/10] try and optimize spark table --- rust/Cargo.toml | 1 + rust/src/delta.rs | 7 ++++++- rust/src/operations/optimize.rs | 23 +++++++++++++++++++++++ rust/src/table_state.rs | 1 + 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e8f8de5b62..de0bea38fa 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -117,6 +117,7 @@ tempdir = "0" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" +fs_extra = "1.2.0" [features] azure = ["object_store/azure"] diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 7355becf6e..31ed0c7051 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -504,9 +504,11 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { + dbg!("hello"); match get_last_checkpoint(&self.storage).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); + dbg!(&last_check_point); if Some(last_check_point) == self.last_check_point { self.update_incremental(None).await } else { @@ -519,7 +521,10 @@ impl DeltaTable { debug!("update without checkpoint"); self.update_incremental(None).await } - Err(err) => Err(DeltaTableError::from(err)), + Err(err) => { + dbg!(&err); + Err(DeltaTableError::from(err)) + }, } } diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index b7983f76ba..9d4420018c 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -1254,6 +1254,8 @@ pub(super) mod zorder { }; use arrow_schema::DataType; + use crate::DeltaOps; + use super::*; #[test] @@ -1318,5 +1320,26 @@ pub(super) mod zorder { assert_eq!(data.value(2)[0..1], [2u8]); assert_eq!(data.value(2)[1..], [0; (3 * 16) - 1]); } + + #[tokio::test] + async fn works_on_spark_table() { + // Create a temporary directory + let tmp_dir = tempdir::TempDir::new("optimize-spark").unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + + // Copy recursively from the test data directory to the temporary directory + let source_path = "tests/data/delta-1.2.1-only-struct-stats"; + fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()) + .unwrap(); + + // Run optimize + let (_, metrics) = DeltaOps::try_from_uri(table_uri).await.unwrap() + .optimize() + .await + .unwrap(); + + // Verify it worked + assert_eq!(metrics.num_files_added, 1); + } } } diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index a0fe4f19b3..e16a8a7771 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -258,6 +258,7 @@ impl DeltaTableState { require_tombstones: bool, require_files: bool, ) { + dbg!(&new_state); if !new_state.tombstones.is_empty() { self.files .retain(|a| !new_state.tombstones.contains(a.path.as_str())); From cd2beaf461faaa9819ca12d0dfe4fb90e387a538 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 19 Sep 2023 20:38:12 -0700 Subject: [PATCH 03/10] cargo fmt --- rust/src/delta.rs | 2 +- rust/src/operations/optimize.rs | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 45bf2c2ffb..95b8615bdd 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -543,7 +543,7 @@ impl DeltaTable { Err(err) => { dbg!(&err); Err(DeltaTableError::from(err)) - }, + } } } diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 866d082668..3525030e82 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -1370,11 +1370,12 @@ pub(super) mod zorder { // Copy recursively from the test data directory to the temporary directory let source_path = "tests/data/delta-1.2.1-only-struct-stats"; - fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()) - .unwrap(); + fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap(); // Run optimize - let (_, metrics) = DeltaOps::try_from_uri(table_uri).await.unwrap() + let (_, metrics) = DeltaOps::try_from_uri(table_uri) + .await + .unwrap() .optimize() .await .unwrap(); From 205bf134cc16b87806c06085789f40e638078e8d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 19 Sep 2023 21:25:08 -0700 Subject: [PATCH 04/10] fix: allow pyarrow 13 --- python/pyproject.toml | 2 +- rust/src/delta.rs | 7 +------ rust/src/table_state.rs | 1 - 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 634b675434..35d69eb2c1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -17,7 +17,7 @@ classifiers = [ "Programming Language :: Python :: 3.11" ] dependencies = [ - "pyarrow>=8,<=12", + "pyarrow>=8", 'typing-extensions;python_version<"3.8"', ] diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 95b8615bdd..d877b77191 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -523,11 +523,9 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - dbg!("hello"); match get_last_checkpoint(&self.storage).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); - dbg!(&last_check_point); if Some(last_check_point) == self.last_check_point { self.update_incremental(None).await } else { @@ -540,10 +538,7 @@ impl DeltaTable { debug!("update without checkpoint"); self.update_incremental(None).await } - Err(err) => { - dbg!(&err); - Err(DeltaTableError::from(err)) - } + Err(err) => Err(DeltaTableError::from(err)), } } diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 8182cad041..9be2200d9e 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -263,7 +263,6 @@ impl DeltaTableState { require_tombstones: bool, require_files: bool, ) { - dbg!(&new_state); if !new_state.tombstones.is_empty() { self.files .retain(|a| !new_state.tombstones.contains(a.path.as_str())); From e70770b7079c82b2fe788355a685c2df7398a3b5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 19 Sep 2023 21:42:45 -0700 Subject: [PATCH 05/10] cleanup --- rust/Cargo.toml | 1 - rust/src/operations/optimize.rs | 24 ------------------------ 2 files changed, 25 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 71a1b8451e..72ad44bbac 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -120,7 +120,6 @@ tempdir = "0" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" -fs_extra = "1.2.0" hyper = { version = "0.14", features = ["server"] } [features] diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 3525030e82..b0321aa434 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -1295,8 +1295,6 @@ pub(super) mod zorder { }; use arrow_schema::DataType; - use crate::DeltaOps; - use super::*; #[test] @@ -1361,27 +1359,5 @@ pub(super) mod zorder { assert_eq!(data.value(2)[0..1], [2u8]); assert_eq!(data.value(2)[1..], [0; (3 * 16) - 1]); } - - #[tokio::test] - async fn works_on_spark_table() { - // Create a temporary directory - let tmp_dir = tempdir::TempDir::new("optimize-spark").unwrap(); - let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); - - // Copy recursively from the test data directory to the temporary directory - let source_path = "tests/data/delta-1.2.1-only-struct-stats"; - fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap(); - - // Run optimize - let (_, metrics) = DeltaOps::try_from_uri(table_uri) - .await - .unwrap() - .optimize() - .await - .unwrap(); - - // Verify it worked - assert_eq!(metrics.num_files_added, 1); - } } } From 4d4de1f2679c2752c356cefefe916697c8bee38b Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 4 Nov 2023 23:09:57 +0100 Subject: [PATCH 06/10] write with non compliant types --- python/deltalake/writer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 7dd271d0f2..7a3722d379 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -302,6 +302,10 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: schema, (validate_batch(batch) for batch in batch_iter) ) + if file_options is not None: + file_options.update(use_compliant_nested_type=False) + else: + file_options = pa.dataset.ParquetFileFormat().make_write_options(use_compliant_nested_type=False) ds.write_dataset( data, base_dir="/", From c5d44eeb0bc719c02c382b953103024e38a21904 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 4 Nov 2023 23:24:22 +0100 Subject: [PATCH 07/10] formatting --- python/deltalake/writer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index ac8409cc38..8bd878e61a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -346,7 +346,10 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: if file_options is not None: file_options.update(use_compliant_nested_type=False) else: - file_options = pa.dataset.ParquetFileFormat().make_write_options(use_compliant_nested_type=False) + file_options = pa.dataset.ParquetFileFormat().make_write_options( + use_compliant_nested_type=False + ) + ds.write_dataset( data, base_dir="/", From 69dfbed5b1ded401c5218b749c9e3f9d9b83f1c7 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 4 Nov 2023 23:25:13 +0100 Subject: [PATCH 08/10] use module directly --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 8bd878e61a..688bd35fde 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -346,7 +346,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: if file_options is not None: file_options.update(use_compliant_nested_type=False) else: - file_options = pa.dataset.ParquetFileFormat().make_write_options( + file_options = ds.ParquetFileFormat().make_write_options( use_compliant_nested_type=False ) From 702027e92ea349a6b39465028eae4b4307b10c60 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 4 Nov 2023 23:49:57 +0100 Subject: [PATCH 09/10] remove equal_batch logic --- python/deltalake/table.py | 15 --------------- python/deltalake/writer.py | 4 ++-- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 16b08acc2c..ad82a010fd 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1435,18 +1435,3 @@ def z_order( ) self.table.update_incremental() return json.loads(metrics) - - -def _cast_to_equal_batch( - batch: pyarrow.RecordBatch, schema: pyarrow.Schema -) -> pyarrow.RecordBatch: - """ - Cast a batch to a schema, if it is already considered equal. - - This is mostly for mapping things like list field names, which arrow-rs - checks when looking at schema equality, but pyarrow does not. - """ - if batch.schema == schema: - return pyarrow.Table.from_batches([batch]).cast(schema).to_batches()[0] - else: - return batch diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 688bd35fde..db729bf0ee 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -40,7 +40,7 @@ from ._internal import batch_distinct from ._internal import write_new_deltalake as _write_new_deltalake from .exceptions import DeltaProtocolError, TableNotFoundError -from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable, _cast_to_equal_batch +from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable try: import pandas as pd # noqa: F811 @@ -320,7 +320,7 @@ def check_data_is_aligned_with_partition_filtering( ) def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: - batch = _cast_to_equal_batch(batch, schema) + # batch = _cast_to_equal_batch(batch, schema) checker.check_batch(batch) if mode == "overwrite" and partition_filters: From 0668c99d867f3a86980b9a9b7aeadb6af5d4b326 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 4 Nov 2023 23:58:24 +0100 Subject: [PATCH 10/10] remove line --- python/deltalake/writer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index db729bf0ee..ef4ae3a57b 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -320,7 +320,6 @@ def check_data_is_aligned_with_partition_filtering( ) def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: - # batch = _cast_to_equal_batch(batch, schema) checker.check_batch(batch) if mode == "overwrite" and partition_filters: