From 00ebdfcd229c9a35d52889fdc9ee1b5a54c44355 Mon Sep 17 00:00:00 2001 From: helanto Date: Thu, 29 Aug 2024 14:41:58 +0300 Subject: [PATCH] feat: Python binding support --- python/deltalake/_internal.pyi | 14 ++++ python/deltalake/table.py | 38 +++++++++- python/deltalake/writer.py | 7 ++ python/src/lib.rs | 133 +++++++++++++++++++++++---------- python/src/merge.rs | 9 ++- 5 files changed, 157 insertions(+), 44 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 1e50d9bdf8..c5814769b3 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -59,6 +59,7 @@ class RawDeltaTable: enforce_retention_duration: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> List[str]: ... def compact_optimize( self, @@ -69,6 +70,7 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> str: ... def z_order_optimize( self, @@ -81,18 +83,21 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> str: ... def add_columns( self, fields: List[Field], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> None: ... def add_constraints( self, constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> None: ... def drop_constraints( self, @@ -100,12 +105,14 @@ class RawDeltaTable: raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> None: ... def set_table_properties( self, properties: Dict[str, str], raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], + max_commit_retries: Optional[int], ) -> None: ... def restore( self, @@ -113,6 +120,7 @@ class RawDeltaTable: ignore_missing_files: bool, protocol_downgrade_allowed: bool, custom_metadata: Optional[Dict[str, str]], + max_commit_retries: Optional[int], ) -> str: ... def history(self, limit: Optional[int]) -> List[str]: ... def update_incremental(self) -> None: ... @@ -127,12 +135,14 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> str: ... def repair( self, dry_run: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> str: ... def update( self, @@ -142,6 +152,7 @@ class RawDeltaTable: safe_cast: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> str: ... def create_merge_builder( self, @@ -153,6 +164,7 @@ class RawDeltaTable: custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], safe_cast: bool, + max_commit_retries: Optional[int], ) -> PyMergeBuilder: ... def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ... def get_active_partitions( @@ -167,6 +179,7 @@ class RawDeltaTable: partitions_filters: Optional[FilterType], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> None: ... def cleanup_metadata(self) -> None: ... def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ... @@ -207,6 +220,7 @@ def write_to_deltalake( writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], + max_commit_retries: Optional[int], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 3aba38bff0..ebee6005a9 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -726,6 +726,7 @@ def vacuum( enforce_retention_duration: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> List[str]: """ Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. @@ -736,6 +737,7 @@ def vacuum( enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `delta.deletedFileRetentionDuration`. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ @@ -749,6 +751,7 @@ def vacuum( enforce_retention_duration, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) def update( @@ -762,6 +765,7 @@ def update( error_on_type_mismatch: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """`UPDATE` records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute. @@ -774,6 +778,7 @@ def update( error_on_type_mismatch: specify if update will return error if data types are mismatching :default = True custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from update @@ -855,6 +860,7 @@ def update( post_commithook_properties=post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries=max_commit_retries, ) return json.loads(metrics) @@ -897,6 +903,7 @@ def merge( large_dtypes: Optional[bool] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> "TableMerger": """Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not @@ -913,6 +920,7 @@ def merge( arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: TableMerger: TableMerger Object @@ -964,6 +972,7 @@ def merge( post_commithook_properties=post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries=max_commit_retries, ) return TableMerger(py_merge_builder, self._table) @@ -974,6 +983,7 @@ def restore( ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, custom_metadata: Optional[Dict[str, str]] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Run the Restore command on the Delta Table: restore table to a given version or datetime. @@ -983,6 +993,7 @@ def restore( ignore_missing_files: whether the operation carry on when some data files missing. protocol_downgrade_allowed: whether the operation when protocol version upgraded. custom_metadata: custom metadata that will be added to the transaction commit. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from restore. @@ -993,6 +1004,7 @@ def restore( ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=custom_metadata, + max_commit_retries=max_commit_retries, ) else: metrics = self._table.restore( @@ -1000,6 +1012,7 @@ def restore( ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=custom_metadata, + max_commit_retries=max_commit_retries, ) return json.loads(metrics) @@ -1228,6 +1241,7 @@ def delete( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Delete records from a Delta Table that statisfy a predicate. @@ -1241,6 +1255,7 @@ def delete( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from delete. @@ -1250,6 +1265,7 @@ def delete( writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) return json.loads(metrics) @@ -1258,6 +1274,7 @@ def repair( dry_run: bool = False, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files. @@ -1270,6 +1287,7 @@ def repair( dry_run: when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: The metrics from repair (FSCK) action. @@ -1289,6 +1307,7 @@ def repair( dry_run, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) return json.loads(metrics) @@ -1680,6 +1699,7 @@ def add_columns( fields: Union[DeltaField, List[DeltaField]], custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """Add new columns and/or update the fields of a stuctcolumn @@ -1687,6 +1707,7 @@ def add_columns( fields: fields to merge into schema custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1709,6 +1730,7 @@ def add_columns( fields, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) def add_constraint( @@ -1716,6 +1738,7 @@ def add_constraint( constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Add constraints to the table. Limited to `single constraint` at once. @@ -1724,6 +1747,7 @@ def add_constraint( constraints: mapping of constraint name to SQL-expression to evaluate on write custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1750,6 +1774,7 @@ def add_constraint( constraints, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) def drop_constraint( @@ -1758,6 +1783,7 @@ def drop_constraint( raise_if_not_exists: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Drop constraints from a table. Limited to `single constraint` at once. @@ -1767,6 +1793,7 @@ def drop_constraint( raise_if_not_exists: set if should raise if not exists. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1792,6 +1819,7 @@ def drop_constraint( raise_if_not_exists, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) def set_table_properties( @@ -1799,6 +1827,7 @@ def set_table_properties( properties: Dict[str, str], raise_if_not_exists: bool = True, custom_metadata: Optional[Dict[str, str]] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Set properties from the table. @@ -1807,6 +1836,7 @@ def set_table_properties( properties: properties which set raise_if_not_exists: set if should raise if not exists. custom_metadata: custom metadata that will be added to the transaction commit. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1824,7 +1854,7 @@ def set_table_properties( ``` """ self.table._table.set_table_properties( - properties, raise_if_not_exists, custom_metadata + properties, raise_if_not_exists, custom_metadata, max_commit_retries ) @@ -1843,6 +1873,7 @@ def compact( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -1867,6 +1898,7 @@ def compact( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1898,6 +1930,7 @@ def compact( writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) self.table.update_incremental() return json.loads(metrics) @@ -1913,6 +1946,7 @@ def z_order( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -1935,6 +1969,7 @@ def z_order( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1968,6 +2003,7 @@ def z_order( writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 99b915183f..b38095cfb8 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -124,6 +124,7 @@ def write_deltalake( engine: Literal["pyarrow"] = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -153,6 +154,7 @@ def write_deltalake( writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -183,6 +185,7 @@ def write_deltalake( writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -219,6 +222,7 @@ def write_deltalake( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """Write to a Delta Lake table @@ -274,6 +278,7 @@ def write_deltalake( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: Custom metadata to add to the commitInfo. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -317,6 +322,7 @@ def write_deltalake( post_commithook_properties=post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries=max_commit_retries, ) if table: table.update_incremental() @@ -552,6 +558,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: post_commithook_properties=post_commithook_properties.__dict__ if post_commithook_properties else None, + max_commit_retries=max_commit_retries, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index 45c3d20bd2..841a2bf862 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -321,7 +321,7 @@ impl RawDeltaTable { /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced /// by the Delta table and are older than the retention threshold. - #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn vacuum( &mut self, py: Python, @@ -330,6 +330,7 @@ impl RawDeltaTable { enforce_retention_duration: bool, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult> { let (table, metrics) = py.allow_threads(|| { let mut cmd = VacuumBuilder::new( @@ -342,9 +343,11 @@ impl RawDeltaTable { cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } rt().block_on(cmd.into_future()).map_err(PythonError::from) @@ -354,7 +357,7 @@ impl RawDeltaTable { } /// Run the UPDATE command on the Delta Table - #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None))] + #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] #[allow(clippy::too_many_arguments)] pub fn update( &mut self, @@ -365,6 +368,7 @@ impl RawDeltaTable { safe_cast: bool, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = UpdateBuilder::new( @@ -387,9 +391,11 @@ impl RawDeltaTable { cmd = cmd.with_predicate(update_predicate); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -408,7 +414,8 @@ impl RawDeltaTable { min_commit_interval = None, writer_properties=None, custom_metadata=None, - post_commithook_properties=None + post_commithook_properties=None, + max_commit_retries=None, ))] #[allow(clippy::too_many_arguments)] pub fn compact_optimize( @@ -421,6 +428,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -441,9 +449,11 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -468,7 +478,8 @@ impl RawDeltaTable { min_commit_interval = None, writer_properties=None, custom_metadata=None, - post_commithook_properties=None))] + post_commithook_properties=None, + max_commit_retries=None))] pub fn z_order_optimize( &mut self, py: Python, @@ -481,6 +492,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -503,9 +515,11 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -520,13 +534,14 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn add_columns( &mut self, py: Python, fields: Vec, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = AddColumnBuilder::new( @@ -541,9 +556,11 @@ impl RawDeltaTable { cmd = cmd.with_fields(new_fields); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -553,13 +570,14 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn add_constraints( &mut self, py: Python, constraints: HashMap, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = ConstraintBuilder::new( @@ -571,9 +589,11 @@ impl RawDeltaTable { cmd = cmd.with_constraint(col_name.clone(), expression.clone()); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -583,7 +603,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn drop_constraints( &mut self, py: Python, @@ -591,6 +611,7 @@ impl RawDeltaTable { raise_if_not_exists: bool, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = DropConstraintBuilder::new( @@ -600,9 +621,11 @@ impl RawDeltaTable { .with_constraint(name) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -689,6 +712,7 @@ impl RawDeltaTable { writer_properties = None, post_commithook_properties = None, custom_metadata = None, + max_commit_retries=None, ))] pub fn create_merge_builder( &self, @@ -701,6 +725,7 @@ impl RawDeltaTable { writer_properties: Option, post_commithook_properties: Option>>, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult { py.allow_threads(|| { Ok(PyMergeBuilder::new( @@ -714,6 +739,7 @@ impl RawDeltaTable { writer_properties, post_commithook_properties, custom_metadata, + max_commit_retries, ) .map_err(PythonError::from)?) }) @@ -735,13 +761,14 @@ impl RawDeltaTable { } // Run the restore command on the Delta Table: restore table to a given version or datetime - #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, custom_metadata=None))] + #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, custom_metadata=None, max_commit_retries=None))] pub fn restore( &mut self, target: Option<&Bound<'_, PyAny>>, ignore_missing_files: bool, protocol_downgrade_allowed: bool, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult { let mut cmd = RestoreBuilder::new( self._table.log_store(), @@ -763,7 +790,9 @@ impl RawDeltaTable { cmd = cmd.with_ignore_missing_files(ignore_missing_files); cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); - if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, max_commit_retries, None) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -925,6 +954,7 @@ impl RawDeltaTable { schema: PyArrowType, partitions_filters: Option>, custom_metadata: Option>, + max_commit_retries: Option, post_commithook_properties: Option>>, ) -> PyResult<()> { py.allow_threads(|| { @@ -1016,6 +1046,10 @@ impl RawDeltaTable { commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) @@ -1088,7 +1122,7 @@ impl RawDeltaTable { .collect::>()) } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. - #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn delete( &mut self, py: Python, @@ -1096,6 +1130,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = DeleteBuilder::new( @@ -1110,9 +1145,11 @@ impl RawDeltaTable { set_writer_properties(writer_props).map_err(PythonError::from)?, ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1122,12 +1159,13 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None))] + #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None, max_commit_retries=None))] pub fn set_table_properties( &mut self, properties: HashMap, raise_if_not_exists: bool, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult<()> { let mut cmd = SetTablePropertiesBuilder::new( self._table.log_store(), @@ -1136,7 +1174,9 @@ impl RawDeltaTable { .with_properties(properties) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, max_commit_retries, None) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1149,12 +1189,13 @@ impl RawDeltaTable { /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that /// have been deleted or are malformed - #[pyo3(signature = (dry_run = true, custom_metadata = None, post_commithook_properties=None))] + #[pyo3(signature = (dry_run = true, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] pub fn repair( &mut self, dry_run: bool, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult { let mut cmd = FileSystemCheckBuilder::new( self._table.log_store(), @@ -1162,9 +1203,11 @@ impl RawDeltaTable { ) .with_dry_run(dry_run); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1305,6 +1348,7 @@ fn convert_partition_filters( fn maybe_create_commit_properties( custom_metadata: Option>, + max_commit_retries: Option, post_commithook_properties: Option>>, ) -> Option { if custom_metadata.is_none() && post_commithook_properties.is_none() { @@ -1317,6 +1361,10 @@ fn maybe_create_commit_properties( commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) @@ -1605,6 +1653,7 @@ fn write_to_deltalake( writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, + max_commit_retries: Option, ) -> PyResult<()> { py.allow_threads(|| { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); @@ -1658,6 +1707,10 @@ fn write_to_deltalake( commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) diff --git a/python/src/merge.rs b/python/src/merge.rs index e1bb1bf3a6..42ff8e2874 100644 --- a/python/src/merge.rs +++ b/python/src/merge.rs @@ -40,6 +40,7 @@ impl PyMergeBuilder { writer_properties: Option, post_commithook_properties: Option>>, custom_metadata: Option>, + max_commit_retries: Option, ) -> DeltaResult { let ctx = SessionContext::new(); let schema = source.schema(); @@ -63,9 +64,11 @@ impl PyMergeBuilder { cmd = cmd.with_writer_properties(set_writer_properties(writer_props)?); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } Ok(Self {