Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Allow DeltaTable input to scan_delta and read_delta #19229

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
from __future__ import annotations

import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast
from urllib.parse import urlparse

from polars import DataFrame
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.pyarrow_dataset import scan_pyarrow_dataset

if TYPE_CHECKING:
from polars import DataFrame, DataType, LazyFrame
from deltalake import DeltaTable

from polars import DataType, LazyFrame

deanm0000 marked this conversation as resolved.
Show resolved Hide resolved

def read_delta(
source: str,
source: str | DeltaTable,
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
Expand All @@ -31,7 +35,7 @@ def read_delta(
Parameters
----------
source
Path or URI to the root of the Delta lake table.
DeltaTable or a Path or URI to the root of the Delta lake table.

Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
Expand Down Expand Up @@ -138,22 +142,23 @@ def read_delta(
if pyarrow_options is None:
pyarrow_options = {}

resolved_uri = _resolve_delta_lake_uri(source)

dl_tbl = _get_delta_lake_table(
table_path=resolved_uri,
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)

return from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
) # type: ignore[return-value]
return cast(
DataFrame,
from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
),
)


def scan_delta(
source: str,
source: str | DeltaTable,
*,
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
Expand All @@ -166,7 +171,7 @@ def scan_delta(
Parameters
----------
source
Path or URI to the root of the Delta lake table.
DeltaTable or a Path or URI to the root of the Delta lake table.

Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
Expand Down Expand Up @@ -274,9 +279,8 @@ def scan_delta(
if pyarrow_options is None:
pyarrow_options = {}

resolved_uri = _resolve_delta_lake_uri(source)
dl_tbl = _get_delta_lake_table(
table_path=resolved_uri,
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
Expand All @@ -299,7 +303,7 @@ def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str:


def _get_delta_lake_table(
table_path: str,
table_path: str | DeltaTable,
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
Expand All @@ -314,12 +318,27 @@ def _get_delta_lake_table(
"""
_check_if_delta_available()

if isinstance(table_path, deltalake.DeltaTable):
if any(
[
version is not None,
storage_options is not None,
delta_table_options is not None,
]
):
warnings.warn(
"""When supplying a DeltaTable directly, `version`, `storage_options`, and `delta_table_options` are ignored.
To silence this warning, don't supply those parameters.""",
RuntimeWarning,
stacklevel=1,
)
return table_path
if delta_table_options is None:
delta_table_options = {}

resolved_uri = _resolve_delta_lake_uri(table_path)
if not isinstance(version, (str, datetime)):
dl_tbl = deltalake.DeltaTable(
table_path,
resolved_uri,
version=version,
storage_options=storage_options,
**delta_table_options,
Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,11 @@ def test_read_parquet_respects_rechunk_16982(
rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_delta(str(tmp_path), rechunk=rechunk)
assert result.n_chunks() == expected_chunks


def test_scan_delta_DT_input(delta_table_path: Path) -> None:
DT = DeltaTable(str(delta_table_path), version=0)
ldf = pl.scan_delta(DT)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
Loading