Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): Use polars parquet reader for delta scan #19103

Merged
146 changes: 110 additions & 36 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,30 @@
import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from polars import DataFrame
from polars._utils.deprecation import issue_deprecation_warning
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.pyarrow_dataset import scan_pyarrow_dataset
from polars.io.parquet import scan_parquet
from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset
from polars.schema import Schema

if TYPE_CHECKING:
from deltalake import DeltaTable

from polars import DataType, LazyFrame
from polars import DataFrame, DataType, LazyFrame


def read_delta(
source: str | DeltaTable,
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
rechunk: bool = False,
rechunk: bool | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
Expand All @@ -49,6 +51,9 @@ def read_delta(
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.

.. deprecated:: 1.10.0
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
Rechunk is automatically done in native reader
storage_options
Extra options for the storage backends supported by `deltalake`.
For cloud storages, this may include configurations for authentication etc.
Expand All @@ -60,6 +65,9 @@ def read_delta(
pyarrow_options
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
Keyword arguments while converting a Delta lake Table to pyarrow table.

.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.

Returns
-------
DataFrame
Expand All @@ -72,15 +80,6 @@ def read_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.read_delta(table_path) # doctest: +SKIP

Use the `pyarrow_options` parameter to read only certain partitions.
Note: This should be preferred over using an equivalent `.filter()` on the resulting
DataFrame, as this avoids reading the data at all.

>>> pl.read_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )

Reads a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.

Expand Down Expand Up @@ -139,22 +138,38 @@ def read_delta(
... table_path, delta_table_options=delta_table_options
... ) # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}
if pyarrow_options is not None:
issue_deprecation_warning(
message="`pyarrow_options` are deprecated, polars native parquet reader is used when not passing pyarrow options.",
version="1.13",
)
dl_tbl = _get_delta_lake_table(
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)
if rechunk is None:
rechunk = False
return from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
) # type: ignore[return-value]

dl_tbl = _get_delta_lake_table(
table_path=source,
if rechunk is not None:
issue_deprecation_warning(
message="`rechunk` is deprecated, this is automatically done now.",
version="1.13",
)
df = scan_delta(
source=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)

return cast(
DataFrame,
from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
),
)
if columns is not None:
df = df.select(columns)
return df.collect()


def scan_delta(
Expand Down Expand Up @@ -193,6 +208,9 @@ def scan_delta(
Use this parameter when filtering on partitioned columns or to read
from a 'fsspec' supported filesystem.

.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.

Returns
-------
LazyFrame
Expand All @@ -205,13 +223,6 @@ def scan_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.scan_delta(table_path).collect() # doctest: +SKIP

Use the `pyarrow_options` parameter to read only certain partitions.

>>> pl.scan_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )

Creates a scan for a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.

Expand Down Expand Up @@ -276,18 +287,81 @@ def scan_delta(
... table_path, delta_table_options=delta_table_options
... ).collect() # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}

dl_tbl = _get_delta_lake_table(
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)

pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)
if pyarrow_options is not None:
issue_deprecation_warning(
message="PyArrow options are deprecated, polars native parquet scanner is used when not passing pyarrow options.",
version="1.13",
)
pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)

import pyarrow as pa
from deltalake.exceptions import DeltaProtocolError
from deltalake.table import (
MAX_SUPPORTED_READER_VERSION,
NOT_SUPPORTED_READER_VERSION,
SUPPORTED_READER_FEATURES,
)

table_protocol = dl_tbl.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
or table_protocol.min_reader_version == NOT_SUPPORTED_READER_VERSION
):
msg = (
f"The table's minimum reader version is {table_protocol.min_reader_version} "
f"but polars delta scanner only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}"
)
raise DeltaProtocolError(msg)
if (
table_protocol.min_reader_version >= 3
and table_protocol.reader_features is not None
):
missing_features = {*table_protocol.reader_features}.difference(
SUPPORTED_READER_FEATURES
)
if len(missing_features) > 0:
msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner."
raise DeltaProtocolError(msg)

delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True)
polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr]
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
partition_columns = dl_tbl.metadata().partition_columns

def _split_schema(
schema: Schema, partition_columns: list[str]
) -> tuple[Schema, Schema]:
if len(partition_columns) == 0:
return schema, Schema([])
main_schema = []
hive_schema = []

for name, dtype in schema.items():
if name in partition_columns:
hive_schema.append((name, dtype))
else:
main_schema.append((name, dtype))

return Schema(main_schema), Schema(hive_schema)

# Required because main_schema cannot contain hive columns currently
main_schema, hive_schema = _split_schema(polars_schema, partition_columns)

return scan_parquet(
dl_tbl.file_uris(),
schema=main_schema,
hive_schema=hive_schema,
allow_missing_columns=True,
hive_partitioning=len(partition_columns) > 0,
storage_options=storage_options,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
)


def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str:
Expand Down
47 changes: 5 additions & 42 deletions py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ def test_scan_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

ldf = pl.scan_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -142,18 +130,6 @@ def test_read_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

df = pl.read_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -208,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
assert v1.columns == pl_df_1.columns

assert df_supported.shape == pl_df_partitioned.shape
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

assert tbl.version() == 1
assert partitioned_tbl.version() == 0
Expand Down Expand Up @@ -240,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:

assert partitioned_tbl.version() == 1
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")

Expand Down Expand Up @@ -494,6 +470,9 @@ def test_unsupported_dtypes(tmp_path: Path) -> None:
df.write_delta(tmp_path / "time")


@pytest.mark.skip(
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
)
@pytest.mark.write_disk
def test_categorical_becomes_string(tmp_path: Path) -> None:
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
Expand All @@ -502,22 +481,6 @@ def test_categorical_becomes_string(tmp_path: Path) -> None:
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))


@pytest.mark.write_disk
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
def test_read_parquet_respects_rechunk_16982(
rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path
) -> None:
# Create a delta lake table with 3 chunks:
df = pl.DataFrame({"a": [1]})
df.write_delta(str(tmp_path))
df.write_delta(str(tmp_path), mode="append")
df.write_delta(str(tmp_path), mode="append")

rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_delta(str(tmp_path), rechunk=rechunk)
assert result.n_chunks() == expected_chunks


def test_scan_delta_DT_input(delta_table_path: Path) -> None:
DT = DeltaTable(str(delta_table_path), version=0)
ldf = pl.scan_delta(DT)
Expand Down