diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 9b0219fa8dff..b2b27c74a20f 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -1,10 +1,12 @@ from __future__ import annotations +import warnings from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from urllib.parse import urlparse +from polars import DataFrame from polars.convert import from_arrow from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes @@ -12,11 +14,13 @@ from polars.io.pyarrow_dataset import scan_pyarrow_dataset if TYPE_CHECKING: - from polars import DataFrame, DataType, LazyFrame + from deltalake import DeltaTable + + from polars import DataType, LazyFrame def read_delta( - source: str, + source: str | DeltaTable, *, version: int | str | datetime | None = None, columns: list[str] | None = None, @@ -31,7 +35,7 @@ def read_delta( Parameters ---------- source - Path or URI to the root of the Delta lake table. + DeltaTable or a Path or URI to the root of the Delta lake table. Note: For Local filesystem, absolute and relative paths are supported but for the supported object storages - GCS, Azure and S3 full URI must be provided. @@ -138,22 +142,23 @@ def read_delta( if pyarrow_options is None: pyarrow_options = {} - resolved_uri = _resolve_delta_lake_uri(source) - dl_tbl = _get_delta_lake_table( - table_path=resolved_uri, + table_path=source, version=version, storage_options=storage_options, delta_table_options=delta_table_options, ) - return from_arrow( - dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk - ) # type: ignore[return-value] + return cast( + DataFrame, + from_arrow( + dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk + ), + ) def scan_delta( - source: str, + source: str | DeltaTable, *, version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, @@ -166,7 +171,7 @@ def scan_delta( Parameters ---------- source - Path or URI to the root of the Delta lake table. + DeltaTable or a Path or URI to the root of the Delta lake table. Note: For Local filesystem, absolute and relative paths are supported but for the supported object storages - GCS, Azure and S3 full URI must be provided. @@ -274,9 +279,8 @@ def scan_delta( if pyarrow_options is None: pyarrow_options = {} - resolved_uri = _resolve_delta_lake_uri(source) dl_tbl = _get_delta_lake_table( - table_path=resolved_uri, + table_path=source, version=version, storage_options=storage_options, delta_table_options=delta_table_options, @@ -299,7 +303,7 @@ def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str: def _get_delta_lake_table( - table_path: str, + table_path: str | DeltaTable, version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, @@ -314,12 +318,27 @@ def _get_delta_lake_table( """ _check_if_delta_available() + if isinstance(table_path, deltalake.DeltaTable): + if any( + [ + version is not None, + storage_options is not None, + delta_table_options is not None, + ] + ): + warnings.warn( + """When supplying a DeltaTable directly, `version`, `storage_options`, and `delta_table_options` are ignored. + To silence this warning, don't supply those parameters.""", + RuntimeWarning, + stacklevel=1, + ) + return table_path if delta_table_options is None: delta_table_options = {} - + resolved_uri = _resolve_delta_lake_uri(table_path) if not isinstance(version, (str, datetime)): dl_tbl = deltalake.DeltaTable( - table_path, + resolved_uri, version=version, storage_options=storage_options, **delta_table_options, diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index 33c6b052ffdf..213988964de3 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -516,3 +516,11 @@ def test_read_parquet_respects_rechunk_16982( rechunk, expected_chunks = rechunk_and_expected_chunks result = pl.read_delta(str(tmp_path), rechunk=rechunk) assert result.n_chunks() == expected_chunks + + +def test_scan_delta_DT_input(delta_table_path: Path) -> None: + DT = DeltaTable(str(delta_table_path), version=0) + ldf = pl.scan_delta(DT) + + expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]}) + assert_frame_equal(expected, ldf.collect(), check_dtypes=False)