From 8c6c1b05944f87f0cdf12c2300706c1c9d786d7a Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 24 Aug 2024 11:40:54 +0200 Subject: [PATCH] refactor: post_commit_hook_properties --- python/deltalake/_internal.pyi | 26 ++++++++++---------- python/deltalake/table.py | 36 ++++++++-------------------- python/deltalake/writer.py | 8 ++----- python/src/lib.rs | 44 ++++++++++++++++++---------------- python/src/merge.rs | 7 ++++-- 5 files changed, 54 insertions(+), 67 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 1e50d9bdf8..5aedd5e162 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union import pyarrow import pyarrow.fs as fs -from deltalake.writer import AddAction, WriterProperties +from deltalake.writer import AddAction, PostCommitHookProperties, WriterProperties __version__: str @@ -58,7 +58,7 @@ class RawDeltaTable: retention_hours: Optional[int], enforce_retention_duration: bool, custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> List[str]: ... def compact_optimize( self, @@ -68,7 +68,7 @@ class RawDeltaTable: min_commit_interval: Optional[int], writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> str: ... def z_order_optimize( self, @@ -80,26 +80,26 @@ class RawDeltaTable: min_commit_interval: Optional[int], writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> str: ... def add_columns( self, fields: List[Field], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def add_constraints( self, constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def drop_constraints( self, name: str, raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def set_table_properties( self, @@ -126,13 +126,13 @@ class RawDeltaTable: predicate: Optional[str], writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> str: ... def repair( self, dry_run: bool, custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> str: ... def update( self, @@ -141,7 +141,7 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], safe_cast: bool, custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> str: ... def create_merge_builder( self, @@ -151,7 +151,7 @@ class RawDeltaTable: target_alias: Optional[str], writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], safe_cast: bool, ) -> PyMergeBuilder: ... def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ... @@ -166,7 +166,7 @@ class RawDeltaTable: schema: pyarrow.Schema, partitions_filters: Optional[FilterType], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def cleanup_metadata(self) -> None: ... def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ... @@ -206,7 +206,7 @@ def write_to_deltalake( storage_options: Optional[Dict[str, str]], writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], - post_commithook_properties: Optional[Dict[str, Optional[bool]]], + post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index bccd3a2d7e..123c231593 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -748,7 +748,7 @@ def vacuum( retention_hours, enforce_retention_duration, custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + post_commithook_properties, ) def update( @@ -852,9 +852,7 @@ def update( writer_properties, safe_cast=not error_on_type_mismatch, custom_metadata=custom_metadata, - post_commithook_properties=post_commithook_properties.__dict__ - if post_commithook_properties - else None, + post_commithook_properties=post_commithook_properties, ) return json.loads(metrics) @@ -961,9 +959,7 @@ def merge( safe_cast=not error_on_type_mismatch, writer_properties=writer_properties, custom_metadata=custom_metadata, - post_commithook_properties=post_commithook_properties.__dict__ - if post_commithook_properties - else None, + post_commithook_properties=post_commithook_properties, ) return TableMerger(py_merge_builder, self._table) @@ -1246,10 +1242,7 @@ def delete( the metrics from delete. """ metrics = self._table.delete( - predicate, - writer_properties, - custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + predicate, writer_properties, custom_metadata, post_commithook_properties ) return json.loads(metrics) @@ -1286,9 +1279,7 @@ def repair( ``` """ metrics = self._table.repair( - dry_run, - custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + dry_run, custom_metadata, post_commithook_properties ) return json.loads(metrics) @@ -1706,9 +1697,7 @@ def add_columns( fields = [fields] self.table._table.add_columns( - fields, - custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + fields, custom_metadata, post_commithook_properties ) def add_constraint( @@ -1747,9 +1736,7 @@ def add_constraint( ) self.table._table.add_constraints( - constraints, - custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + constraints, custom_metadata, post_commithook_properties ) def drop_constraint( @@ -1788,10 +1775,7 @@ def drop_constraint( ``` """ self.table._table.drop_constraints( - name, - raise_if_not_exists, - custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + name, raise_if_not_exists, custom_metadata, post_commithook_properties ) def set_table_properties( @@ -1897,7 +1881,7 @@ def compact( min_commit_interval, writer_properties, custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + post_commithook_properties, ) self.table.update_incremental() return json.loads(metrics) @@ -1967,7 +1951,7 @@ def z_order( min_commit_interval, writer_properties, custom_metadata, - post_commithook_properties.__dict__ if post_commithook_properties else None, + post_commithook_properties, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 99b915183f..95368dbf79 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -314,9 +314,7 @@ def write_deltalake( storage_options=storage_options, writer_properties=writer_properties, custom_metadata=custom_metadata, - post_commithook_properties=post_commithook_properties.__dict__ - if post_commithook_properties - else None, + post_commithook_properties=post_commithook_properties, ) if table: table.update_incremental() @@ -549,9 +547,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: schema, partition_filters, custom_metadata, - post_commithook_properties=post_commithook_properties.__dict__ - if post_commithook_properties - else None, + post_commithook_properties=post_commithook_properties, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index 45c3d20bd2..787d321d08 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -329,7 +329,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult> { let (table, metrics) = py.allow_threads(|| { let mut cmd = VacuumBuilder::new( @@ -364,7 +364,7 @@ impl RawDeltaTable { writer_properties: Option, safe_cast: bool, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = UpdateBuilder::new( @@ -420,7 +420,7 @@ impl RawDeltaTable { min_commit_interval: Option, writer_properties: Option, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -480,7 +480,7 @@ impl RawDeltaTable { min_commit_interval: Option, writer_properties: Option, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -526,7 +526,7 @@ impl RawDeltaTable { py: Python, fields: Vec, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = AddColumnBuilder::new( @@ -559,7 +559,7 @@ impl RawDeltaTable { py: Python, constraints: HashMap, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = ConstraintBuilder::new( @@ -590,7 +590,7 @@ impl RawDeltaTable { name: String, raise_if_not_exists: bool, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = DropConstraintBuilder::new( @@ -699,7 +699,7 @@ impl RawDeltaTable { target_alias: Option, safe_cast: bool, writer_properties: Option, - post_commithook_properties: Option>>, + post_commithook_properties: Option, custom_metadata: Option>, ) -> PyResult { py.allow_threads(|| { @@ -925,7 +925,7 @@ impl RawDeltaTable { schema: PyArrowType, partitions_filters: Option>, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult<()> { py.allow_threads(|| { let mode = mode.parse().map_err(PythonError::from)?; @@ -1095,7 +1095,7 @@ impl RawDeltaTable { predicate: Option, writer_properties: Option, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = DeleteBuilder::new( @@ -1154,7 +1154,7 @@ impl RawDeltaTable { &mut self, dry_run: bool, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult { let mut cmd = FileSystemCheckBuilder::new( self._table.log_store(), @@ -1178,14 +1178,12 @@ impl RawDeltaTable { fn set_post_commithook_properties( mut commit_properties: CommitProperties, - post_commithook_properties: HashMap>, + post_commithook_properties: PyPostCommitHookProperties, ) -> CommitProperties { - if let Some(Some(create_checkpoint)) = post_commithook_properties.get("create_checkpoint") { - commit_properties = commit_properties.with_create_checkpoint(*create_checkpoint) - } - if let Some(cleanup_expired_logs) = post_commithook_properties.get("cleanup_expired_logs") { - commit_properties = commit_properties.with_cleanup_expired_logs(*cleanup_expired_logs) - } + commit_properties = + commit_properties.with_create_checkpoint(post_commithook_properties.create_checkpoint); + commit_properties = commit_properties + .with_cleanup_expired_logs(post_commithook_properties.cleanup_expired_logs); commit_properties } @@ -1305,7 +1303,7 @@ fn convert_partition_filters( fn maybe_create_commit_properties( custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> Option { if custom_metadata.is_none() && post_commithook_properties.is_none() { return None; @@ -1587,6 +1585,12 @@ pub struct PyWriterProperties { column_properties: Option>>, } +#[derive(FromPyObject)] +pub struct PyPostCommitHookProperties { + create_checkpoint: bool, + cleanup_expired_logs: Option, +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_to_deltalake( @@ -1604,7 +1608,7 @@ fn write_to_deltalake( storage_options: Option>, writer_properties: Option, custom_metadata: Option>, - post_commithook_properties: Option>>, + post_commithook_properties: Option, ) -> PyResult<()> { py.allow_threads(|| { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); diff --git a/python/src/merge.rs b/python/src/merge.rs index e1bb1bf3a6..cdf997a935 100644 --- a/python/src/merge.rs +++ b/python/src/merge.rs @@ -16,7 +16,10 @@ use std::sync::Arc; use crate::error::PythonError; use crate::utils::rt; -use crate::{maybe_create_commit_properties, set_writer_properties, PyWriterProperties}; +use crate::{ + maybe_create_commit_properties, set_writer_properties, PyPostCommitHookProperties, + PyWriterProperties, +}; #[pyclass(module = "deltalake._internal")] pub(crate) struct PyMergeBuilder { @@ -38,7 +41,7 @@ impl PyMergeBuilder { target_alias: Option, safe_cast: bool, writer_properties: Option, - post_commithook_properties: Option>>, + post_commithook_properties: Option, custom_metadata: Option>, ) -> DeltaResult { let ctx = SessionContext::new();