Skip to content

Commit

Permalink
chore: keep deprecated pyarrow options behavior, issue warning
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Oct 5, 2024
1 parent 699c4ff commit 0103cdd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
56 changes: 56 additions & 0 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from polars._utils.deprecation import issue_deprecation_warning
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.parquet import scan_parquet
from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset
from polars.schema import Schema

if TYPE_CHECKING:
Expand All @@ -21,8 +23,10 @@ def read_delta(
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
rechunk: bool | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
) -> DataFrame:
"""
Reads into a DataFrame from a Delta lake table.
Expand All @@ -41,6 +45,12 @@ def read_delta(
table is read.
columns
Columns to select. Accepts a list of column names.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
.. deprecated:: 1.10.0
Rechunk is automatically done in native reader
storage_options
Extra options for the storage backends supported by `deltalake`.
For cloud storages, this may include configurations for authentication etc.
Expand All @@ -49,6 +59,11 @@ def read_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.
Returns
-------
Expand Down Expand Up @@ -120,6 +135,30 @@ def read_delta(
... table_path, delta_table_options=delta_table_options
... ) # doctest: +SKIP
"""
if pyarrow_options is not None:
issue_deprecation_warning(
message="`pyarrow_options` are deprecated, polars native parquet reader is used when not passing pyarrow options.",
version="1.10",
)
resolved_uri = _resolve_delta_lake_uri(source)

dl_tbl = _get_delta_lake_table(
table_path=resolved_uri,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)
if rechunk is None:
rechunk = False
return from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
) # type: ignore[return-value]

if rechunk is not None:
issue_deprecation_warning(
message="`rechunk` is deprecated, this is automatically done now.",
version="1.10",
)
df = scan_delta(
source=source,
version=version,
Expand All @@ -137,6 +176,7 @@ def scan_delta(
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
) -> LazyFrame:
"""
Lazily read from a Delta lake table.
Expand All @@ -161,6 +201,13 @@ def scan_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
Use this parameter when filtering on partitioned columns or to read
from a 'fsspec' supported filesystem.
.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.
Returns
-------
Expand Down Expand Up @@ -245,6 +292,15 @@ def scan_delta(
storage_options=storage_options,
delta_table_options=delta_table_options,
)

if pyarrow_options is not None:
issue_deprecation_warning(
message="PyArrow options are deprecated, polars native parquet scanner is used when not passing pyarrow options.",
version="1.10",
)
pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)

import pyarrow as pa
from deltalake.exceptions import DeltaProtocolError
from deltalake.table import (
Expand Down
23 changes: 5 additions & 18 deletions py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
assert v1.columns == pl_df_1.columns

assert df_supported.shape == pl_df_partitioned.shape
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

assert tbl.version() == 1
assert partitioned_tbl.version() == 0
Expand Down Expand Up @@ -216,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:

assert partitioned_tbl.version() == 1
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")

Expand Down Expand Up @@ -470,25 +470,12 @@ def test_unsupported_dtypes(tmp_path: Path) -> None:
df.write_delta(tmp_path / "time")


@pytest.mark.skip(
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
)
@pytest.mark.write_disk
def test_categorical_becomes_string(tmp_path: Path) -> None:
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
df.write_delta(tmp_path)
df2 = pl.read_delta(str(tmp_path))
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))


@pytest.mark.write_disk
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
def test_read_parquet_respects_rechunk_16982(
rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path
) -> None:
# Create a delta lake table with 3 chunks:
df = pl.DataFrame({"a": [1]})
df.write_delta(str(tmp_path))
df.write_delta(str(tmp_path), mode="append")
df.write_delta(str(tmp_path), mode="append")

rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_delta(str(tmp_path))
assert result.n_chunks() == expected_chunks

0 comments on commit 0103cdd

Please sign in to comment.