From 96355f04090e43e3900d6ce11e58d97fb481e7d2 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 7 Nov 2023 01:00:21 +0100 Subject: [PATCH] align compression levels --- python/deltalake/_internal.pyi | 4 ++-- python/deltalake/table.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 371cc11c6c..cf54a0c147 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -64,7 +64,7 @@ class RawDeltaTable: target_size: Optional[int], max_concurrent_tasks: Optional[int], min_commit_interval: Optional[int], - compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"], + compression: str, ) -> str: ... def z_order_optimize( self, @@ -74,7 +74,7 @@ class RawDeltaTable: max_concurrent_tasks: Optional[int], max_spill_size: Optional[int], min_commit_interval: Optional[int], - compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"], + compression: str, ) -> str: ... def restore( self, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index c38d1fbdeb..808f68f462 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -109,6 +109,15 @@ class ProtocolVersions(NamedTuple): FilterType = Union[FilterConjunctionType, FilterDNFType] +def _get_parquet_rs_compression(compression: str): + """Returns compression into parquet-rs compression format with pyarrow default compression level""" + if compression in ["gzip", "brotli", "zstd"]: + compression = ( + f"{compression}({pyarrow.Codec.default_compression_level(compression)})" + ) + return compression + + def _check_contains_null(value: Any) -> bool: """ Check if target contains nullish value. @@ -1377,12 +1386,14 @@ def compact( if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) + compress = _get_parquet_rs_compression(compression) + metrics = self.table._table.compact_optimize( partition_filters, target_size, max_concurrent_tasks, min_commit_interval, - compression, + compress, ) self.table.update_incremental() return json.loads(metrics) @@ -1432,6 +1443,8 @@ def z_order( if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) + compress = _get_parquet_rs_compression(compression) + metrics = self.table._table.z_order_optimize( list(columns), partition_filters, @@ -1439,7 +1452,7 @@ def z_order( max_concurrent_tasks, max_spill_size, min_commit_interval, - compression, + compress, ) self.table.update_incremental() return json.loads(metrics)