Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): Use polars parquet reader for delta scan #19103

Merged
134 changes: 98 additions & 36 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from polars import DataFrame
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.pyarrow_dataset import scan_pyarrow_dataset
from polars.io.parquet import scan_parquet
from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset
from polars.schema import Schema

if TYPE_CHECKING:
from deltalake import DeltaTable

from polars import DataType, LazyFrame
from polars import DataFrame, DataType, LazyFrame


def read_delta(
Expand All @@ -27,6 +28,7 @@ def read_delta(
rechunk: bool = False,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
) -> DataFrame:
"""
Expand All @@ -49,6 +51,9 @@ def read_delta(
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.

.. deprecated:: 1.10.0
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
Rechunk is automatically done in native reader
storage_options
Extra options for the storage backends supported by `deltalake`.
For cloud storages, this may include configurations for authentication etc.
Expand All @@ -57,6 +62,8 @@ def read_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
Keyword arguments while converting a Delta lake Table to pyarrow table.

Expand All @@ -72,15 +79,6 @@ def read_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.read_delta(table_path) # doctest: +SKIP

Use the `pyarrow_options` parameter to read only certain partitions.
Note: This should be preferred over using an equivalent `.filter()` on the resulting
DataFrame, as this avoids reading the data at all.

>>> pl.read_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )

Reads a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.

Expand Down Expand Up @@ -139,22 +137,19 @@ def read_delta(
... table_path, delta_table_options=delta_table_options
... ) # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}

dl_tbl = _get_delta_lake_table(
table_path=source,
df = scan_delta(
source=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
use_pyarrow=use_pyarrow,
pyarrow_options=pyarrow_options,
rechunk=rechunk,
)

return cast(
DataFrame,
from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
),
)
if columns is not None:
df = df.select(columns)
return df.collect()


def scan_delta(
Expand All @@ -163,7 +158,9 @@ def scan_delta(
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
rechunk: bool = False,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
) -> LazyFrame:
"""
Lazily read from a Delta lake table.
Expand All @@ -188,10 +185,15 @@ def scan_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
Use this parameter when filtering on partitioned columns or to read
from a 'fsspec' supported filesystem.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.

Returns
-------
Expand All @@ -205,13 +207,6 @@ def scan_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.scan_delta(table_path).collect() # doctest: +SKIP

Use the `pyarrow_options` parameter to read only certain partitions.

>>> pl.scan_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )

Creates a scan for a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.

Expand Down Expand Up @@ -276,18 +271,85 @@ def scan_delta(
... table_path, delta_table_options=delta_table_options
... ).collect() # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}

dl_tbl = _get_delta_lake_table(
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)

pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)
if use_pyarrow:
pyarrow_options = pyarrow_options or {}
pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)

if pyarrow_options is not None:
msg = "To make use of pyarrow_options, set use_pyarrow to True"
raise ValueError(msg)

import pyarrow as pa
from deltalake.exceptions import DeltaProtocolError
from deltalake.table import (
MAX_SUPPORTED_READER_VERSION,
NOT_SUPPORTED_READER_VERSION,
SUPPORTED_READER_FEATURES,
)

table_protocol = dl_tbl.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
or table_protocol.min_reader_version == NOT_SUPPORTED_READER_VERSION
):
msg = (
f"The table's minimum reader version is {table_protocol.min_reader_version} "
f"but polars delta scanner only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}"
)
raise DeltaProtocolError(msg)
if (
table_protocol.min_reader_version >= 3
and table_protocol.reader_features is not None
):
missing_features = {*table_protocol.reader_features}.difference(
SUPPORTED_READER_FEATURES
)
if len(missing_features) > 0:
msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner."
raise DeltaProtocolError(msg)

# Requires conversion through pyarrow table because there is no direct way yet to
# convert a delta schema into a polars schema
delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True)
polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr]
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
partition_columns = dl_tbl.metadata().partition_columns

def _split_schema(
schema: Schema, partition_columns: list[str]
) -> tuple[Schema, Schema]:
if len(partition_columns) == 0:
return schema, Schema([])
main_schema = []
hive_schema = []

for name, dtype in schema.items():
if name in partition_columns:
hive_schema.append((name, dtype))
else:
main_schema.append((name, dtype))

return Schema(main_schema), Schema(hive_schema)

# Required because main_schema cannot contain hive columns currently
main_schema, hive_schema = _split_schema(polars_schema, partition_columns)

return scan_parquet(
dl_tbl.file_uris(),
schema=main_schema,
hive_schema=hive_schema,
allow_missing_columns=True,
hive_partitioning=len(partition_columns) > 0,
storage_options=storage_options,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
rechunk=rechunk,
)


def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str:
Expand Down
47 changes: 5 additions & 42 deletions py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ def test_scan_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

ldf = pl.scan_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -142,18 +130,6 @@ def test_read_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

df = pl.read_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -208,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
assert v1.columns == pl_df_1.columns

assert df_supported.shape == pl_df_partitioned.shape
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

assert tbl.version() == 1
assert partitioned_tbl.version() == 0
Expand Down Expand Up @@ -240,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:

assert partitioned_tbl.version() == 1
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")

Expand Down Expand Up @@ -494,6 +470,9 @@ def test_unsupported_dtypes(tmp_path: Path) -> None:
df.write_delta(tmp_path / "time")


@pytest.mark.skip(
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
)
@pytest.mark.write_disk
def test_categorical_becomes_string(tmp_path: Path) -> None:
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
Expand All @@ -502,22 +481,6 @@ def test_categorical_becomes_string(tmp_path: Path) -> None:
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))


@pytest.mark.write_disk
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
def test_read_parquet_respects_rechunk_16982(
rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path
) -> None:
# Create a delta lake table with 3 chunks:
df = pl.DataFrame({"a": [1]})
df.write_delta(str(tmp_path))
df.write_delta(str(tmp_path), mode="append")
df.write_delta(str(tmp_path), mode="append")

rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_delta(str(tmp_path), rechunk=rechunk)
assert result.n_chunks() == expected_chunks


def test_scan_delta_DT_input(delta_table_path: Path) -> None:
DT = DeltaTable(str(delta_table_path), version=0)
ldf = pl.scan_delta(DT)
Expand Down
Loading