diff --git a/docs/usage/optimize/small-file-compaction-with-optimize.md b/docs/usage/optimize/small-file-compaction-with-optimize.md index 78d8778ff5..d86bba54f7 100644 --- a/docs/usage/optimize/small-file-compaction-with-optimize.md +++ b/docs/usage/optimize/small-file-compaction-with-optimize.md @@ -104,7 +104,7 @@ Let’s run the optimize command to compact the existing small files into larger ```python dt = DeltaTable("observation_data") -dt.optimize() +dt.optimize.compact() ``` Here’s the output of the command: diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index b6e0af3304..440f7a9eec 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -48,8 +48,10 @@ class RawDeltaTable: def protocol_versions(self) -> List[Any]: ... def load_version(self, version: int) -> None: ... def load_with_datetime(self, ds: str) -> None: ... - def files(self, partition_filters: Optional[FilterType]) -> List[str]: ... - def file_uris(self, partition_filters: Optional[FilterType]) -> List[str]: ... + def files(self, partition_filters: Optional[PartitionFilterType]) -> List[str]: ... + def file_uris( + self, partition_filters: Optional[PartitionFilterType] + ) -> List[str]: ... def vacuum( self, dry_run: bool, @@ -60,7 +62,7 @@ class RawDeltaTable: ) -> List[str]: ... def compact_optimize( self, - partition_filters: Optional[FilterType], + partition_filters: Optional[PartitionFilterType], target_size: Optional[int], max_concurrent_tasks: Optional[int], min_commit_interval: Optional[int], @@ -71,7 +73,7 @@ class RawDeltaTable: def z_order_optimize( self, z_order_columns: List[str], - partition_filters: Optional[FilterType], + partition_filters: Optional[PartitionFilterType], target_size: Optional[int], max_concurrent_tasks: Optional[int], max_spill_size: Optional[int], @@ -115,7 +117,7 @@ class RawDeltaTable: def history(self, limit: Optional[int]) -> List[str]: ... def update_incremental(self) -> None: ... def dataset_partitions( - self, schema: pyarrow.Schema, partition_filters: Optional[FilterType] + self, schema: pyarrow.Schema, partition_filters: Optional[FilterConjunctionType] ) -> List[Any]: ... def create_checkpoint(self) -> None: ... def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ... @@ -848,3 +850,4 @@ FilterLiteralType = Tuple[str, str, Any] FilterConjunctionType = List[FilterLiteralType] FilterDNFType = List[FilterConjunctionType] FilterType = Union[FilterConjunctionType, FilterDNFType] +PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]] diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 18291467cc..9628bff104 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -38,7 +38,9 @@ if TYPE_CHECKING: import os -from deltalake._internal import RawDeltaTable +from deltalake._internal import ( + RawDeltaTable, +) from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog @@ -63,6 +65,12 @@ NOT_SUPPORTED_READER_VERSION = 2 SUPPORTED_READER_FEATURES = {"timestampNtz"} +FilterLiteralType = Tuple[str, str, Any] +FilterConjunctionType = List[FilterLiteralType] +FilterDNFType = List[FilterConjunctionType] +FilterType = Union[FilterConjunctionType, FilterDNFType] +PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]] + class Compression(Enum): UNCOMPRESSED = "UNCOMPRESSED" @@ -336,15 +344,6 @@ class ProtocolVersions(NamedTuple): reader_features: Optional[List[str]] -FilterLiteralType = Tuple[str, str, Any] - -FilterConjunctionType = List[FilterLiteralType] - -FilterDNFType = List[FilterConjunctionType] - -FilterType = Union[FilterConjunctionType, FilterDNFType] - - @dataclass(init=False) class DeltaTable: """Represents a Delta Table""" @@ -544,10 +543,10 @@ def files( ("z", "not in", ["a","b"]) ``` """ - return self._table.files(self.__stringify_partition_values(partition_filters)) + return self._table.files(self._stringify_partition_values(partition_filters)) def file_uris( - self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None + self, partition_filters: Optional[FilterConjunctionType] = None ) -> List[str]: """ Get the list of files as absolute URIs, including the scheme (e.g. "s3://"). @@ -582,7 +581,7 @@ def file_uris( ``` """ return self._table.file_uris( - self.__stringify_partition_values(partition_filters) + self._stringify_partition_values(partition_filters) ) file_uris.__doc__ = "" @@ -629,48 +628,6 @@ def load_as_version(self, version: Union[int, str, datetime]) -> None: "Invalid datatype provided for version, only int, str or datetime are accepted." ) - def load_version(self, version: int) -> None: - """ - Load a DeltaTable with a specified version. - - !!! warning "Deprecated" - Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. - - Args: - version: the identifier of the version of the DeltaTable to load - """ - warnings.warn( - "Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.", - category=DeprecationWarning, - stacklevel=2, - ) - self._table.load_version(version) - - def load_with_datetime(self, datetime_string: str) -> None: - """ - Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument. - The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. - - !!! warning "Deprecated" - Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. - - Args: - datetime_string: the identifier of the datetime point of the DeltaTable to load - - Example: - ``` - "2018-01-26T18:30:09Z" - "2018-12-19T16:39:57-08:00" - "2018-01-26T18:30:09.453+00:00" - ``` - """ - warnings.warn( - "Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.", - category=DeprecationWarning, - stacklevel=2, - ) - self._table.load_with_datetime(datetime_string) - def load_cdf( self, starting_version: int = 0, @@ -700,7 +657,7 @@ def schema(self) -> DeltaSchema: """ return self._table.schema - def files_by_partitions(self, partition_filters: FilterType) -> List[str]: + def files_by_partitions(self, partition_filters: PartitionFilterType) -> List[str]: """ Get the files for each partition @@ -711,7 +668,7 @@ def files_by_partitions(self, partition_filters: FilterType) -> List[str]: stacklevel=2, ) - return self.files(partition_filters) # type: ignore + return self.files(partition_filters) def metadata(self) -> Metadata: """ @@ -1045,7 +1002,7 @@ def restore( def to_pyarrow_dataset( self, - partitions: Optional[List[Tuple[str, str, Any]]] = None, + partitions: Optional[FilterConjunctionType] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, parquet_read_options: Optional[ParquetReadOptions] = None, schema: Optional[pyarrow.Schema] = None, @@ -1206,9 +1163,9 @@ def cleanup_metadata(self) -> None: """ self._table.cleanup_metadata() - def __stringify_partition_values( - self, partition_filters: Optional[List[Tuple[str, str, Any]]] - ) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]: + def _stringify_partition_values( + self, partition_filters: Optional[FilterConjunctionType] + ) -> Optional[PartitionFilterType]: if partition_filters is None: return partition_filters out = [] @@ -1370,45 +1327,6 @@ def __init__( self.not_matched_by_source_delete_predicate: Optional[List[str]] = None self.not_matched_by_source_delete_all: Optional[bool] = None - def with_writer_properties( - self, - data_page_size_limit: Optional[int] = None, - dictionary_page_size_limit: Optional[int] = None, - data_page_row_count_limit: Optional[int] = None, - write_batch_size: Optional[int] = None, - max_row_group_size: Optional[int] = None, - ) -> "TableMerger": - """ - !!! warning "Deprecated" - Use `.merge(writer_properties = WriterProperties())` instead - Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html: - - Args: - data_page_size_limit: Limit DataPage size to this in bytes. - dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes. - data_page_row_count_limit: Limit the number of rows in each DataPage. - write_batch_size: Splits internally to smaller batch size. - max_row_group_size: Max number of rows in row group. - - Returns: - TableMerger: TableMerger Object - """ - warnings.warn( - "Call to deprecated method TableMerger.with_writer_properties. Use DeltaTable.merge(writer_properties=WriterProperties()) instead.", - category=DeprecationWarning, - stacklevel=2, - ) - - writer_properties: Dict[str, Any] = { - "data_page_size_limit": data_page_size_limit, - "dictionary_page_size_limit": dictionary_page_size_limit, - "data_page_row_count_limit": data_page_row_count_limit, - "write_batch_size": write_batch_size, - "max_row_group_size": max_row_group_size, - } - self.writer_properties = WriterProperties(**writer_properties) - return self - def when_matched_update( self, updates: Dict[str, str], predicate: Optional[str] = None ) -> "TableMerger": @@ -2003,28 +1921,9 @@ class TableOptimizer: def __init__(self, table: DeltaTable): self.table = table - def __call__( - self, - partition_filters: Optional[FilterType] = None, - target_size: Optional[int] = None, - max_concurrent_tasks: Optional[int] = None, - ) -> Dict[str, Any]: - """ - !!! warning "DEPRECATED 0.10.0" - Use [compact][deltalake.table.DeltaTable.compact] instead, which has the same signature. - """ - - warnings.warn( - "Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.", - category=DeprecationWarning, - stacklevel=2, - ) - - return self.compact(partition_filters, target_size, max_concurrent_tasks) - def compact( self, - partition_filters: Optional[FilterType] = None, + partition_filters: Optional[FilterConjunctionType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, min_commit_interval: Optional[Union[int, timedelta]] = None, @@ -2079,7 +1978,7 @@ def compact( min_commit_interval = int(min_commit_interval.total_seconds()) metrics = self.table._table.compact_optimize( - partition_filters, + self.table._stringify_partition_values(partition_filters), target_size, max_concurrent_tasks, min_commit_interval, @@ -2093,7 +1992,7 @@ def compact( def z_order( self, columns: Iterable[str], - partition_filters: Optional[FilterType] = None, + partition_filters: Optional[FilterConjunctionType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, max_spill_size: int = 20 * 1024 * 1024 * 1024, @@ -2148,7 +2047,7 @@ def z_order( metrics = self.table._table.z_order_optimize( list(columns), - partition_filters, + self.table._stringify_partition_values(partition_filters), target_size, max_concurrent_tasks, max_spill_size,