From ea25c40bc84a58ebd7cec1bf0ebab83dc7491a36 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:07:43 +0200 Subject: [PATCH 01/10] refactor: use polars parquet reader --- py-polars/polars/io/delta.py | 115 +++++++++++++++----------- py-polars/tests/unit/io/test_delta.py | 24 ------ 2 files changed, 68 insertions(+), 71 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 9cb621fc5534..8cc4a7b65fad 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -5,14 +5,15 @@ from typing import TYPE_CHECKING, Any from urllib.parse import urlparse -from polars.convert import from_arrow +from polars.dataframe import DataFrame from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake -from polars.io.pyarrow_dataset import scan_pyarrow_dataset +from polars.io.parquet import scan_parquet +from polars.schema import Schema if TYPE_CHECKING: - from polars import DataFrame, DataType, LazyFrame + from polars import DataType, LazyFrame def read_delta( @@ -20,10 +21,8 @@ def read_delta( *, version: int | str | datetime | None = None, columns: list[str] | None = None, - rechunk: bool = False, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, - pyarrow_options: dict[str, Any] | None = None, ) -> DataFrame: """ Reads into a DataFrame from a Delta lake table. @@ -42,9 +41,6 @@ def read_delta( table is read. columns Columns to select. Accepts a list of column names. - rechunk - Make sure that all columns are contiguous in memory by - aggregating the chunks into a single array. storage_options Extra options for the storage backends supported by `deltalake`. For cloud storages, this may include configurations for authentication etc. @@ -53,8 +49,6 @@ def read_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. - pyarrow_options - Keyword arguments while converting a Delta lake Table to pyarrow table. Returns ------- @@ -68,15 +62,6 @@ def read_delta( >>> table_path = "/path/to/delta-table/" >>> pl.read_delta(table_path) # doctest: +SKIP - Use the `pyarrow_options` parameter to read only certain partitions. - Note: This should be preferred over using an equivalent `.filter()` on the resulting - DataFrame, as this avoids reading the data at all. - - >>> pl.read_delta( # doctest: +SKIP - ... table_path, - ... pyarrow_options={"partitions": [("year", "=", "2021")]}, - ... ) - Reads a specific version of the Delta table from local filesystem. Note: This will fail if the provided version of the delta table does not exist. @@ -135,21 +120,15 @@ def read_delta( ... table_path, delta_table_options=delta_table_options ... ) # doctest: +SKIP """ - if pyarrow_options is None: - pyarrow_options = {} - - resolved_uri = _resolve_delta_lake_uri(source) - - dl_tbl = _get_delta_lake_table( - table_path=resolved_uri, + df = scan_delta( + source=source, version=version, storage_options=storage_options, delta_table_options=delta_table_options, ) - - return from_arrow( - dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk - ) # type: ignore[return-value] + if columns is not None: + df = df.select(columns) + return df.collect() def scan_delta( @@ -158,7 +137,6 @@ def scan_delta( version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, - pyarrow_options: dict[str, Any] | None = None, ) -> LazyFrame: """ Lazily read from a Delta lake table. @@ -183,10 +161,6 @@ def scan_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. - pyarrow_options - Keyword arguments while converting a Delta lake Table to pyarrow table. - Use this parameter when filtering on partitioned columns or to read - from a 'fsspec' supported filesystem. Returns ------- @@ -200,13 +174,6 @@ def scan_delta( >>> table_path = "/path/to/delta-table/" >>> pl.scan_delta(table_path).collect() # doctest: +SKIP - Use the `pyarrow_options` parameter to read only certain partitions. - - >>> pl.scan_delta( # doctest: +SKIP - ... table_path, - ... pyarrow_options={"partitions": [("year", "=", "2021")]}, - ... ) - Creates a scan for a specific version of the Delta table from local filesystem. Note: This will fail if the provided version of the delta table does not exist. @@ -271,9 +238,6 @@ def scan_delta( ... table_path, delta_table_options=delta_table_options ... ).collect() # doctest: +SKIP """ - if pyarrow_options is None: - pyarrow_options = {} - resolved_uri = _resolve_delta_lake_uri(source) dl_tbl = _get_delta_lake_table( table_path=resolved_uri, @@ -281,9 +245,66 @@ def scan_delta( storage_options=storage_options, delta_table_options=delta_table_options, ) + import pyarrow as pa + from deltalake.exceptions import DeltaProtocolError + from deltalake.table import ( + MAX_SUPPORTED_READER_VERSION, + NOT_SUPPORTED_READER_VERSION, + SUPPORTED_READER_FEATURES, + ) - pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options) - return scan_pyarrow_dataset(pa_ds) + table_protocol = dl_tbl.protocol() + if ( + table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION + or table_protocol.min_reader_version == NOT_SUPPORTED_READER_VERSION + ): + msg = ( + f"The table's minimum reader version is {table_protocol.min_reader_version} " + f"but deltalake only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}" + ) + raise DeltaProtocolError(msg) + if ( + table_protocol.min_reader_version >= 3 + and table_protocol.reader_features is not None + ): + missing_features = {*table_protocol.reader_features}.difference( + SUPPORTED_READER_FEATURES + ) + if len(missing_features) > 0: + msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader." + raise DeltaProtocolError(msg) + + delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) + polars_schema = DataFrame(pa.Table.from_pylist([], delta_schema)).schema + partition_columns = dl_tbl.metadata().partition_columns + + def _split_schema( + schema: Schema, partition_columns: list[str] + ) -> tuple[Schema, Schema]: + if len(partition_columns) == 0: + return schema, Schema([]) + main_schema = [] + hive_schema = [] + + for name, dtype in schema.items(): + if name in partition_columns: + hive_schema.append((name, dtype)) + else: + main_schema.append((name, dtype)) + + return Schema(main_schema), Schema(hive_schema) + + # Required because main_schema cannot contain hive columns currently + main_schema, hive_schema = _split_schema(polars_schema, partition_columns) + + return scan_parquet( + dl_tbl.file_uris(), + schema=main_schema, + hive_schema=hive_schema, + allow_missing_columns=True, + hive_partitioning=len(partition_columns) > 0, + storage_options=storage_options, + ) def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str: diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index 33c6b052ffdf..99c80589b81a 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -69,18 +69,6 @@ def test_scan_delta_columns(delta_table_path: Path) -> None: assert_frame_equal(expected, ldf.collect(), check_dtypes=False) -def test_scan_delta_filesystem(delta_table_path: Path) -> None: - raw_filesystem = pyarrow.fs.LocalFileSystem() - fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem) - - ldf = pl.scan_delta( - str(delta_table_path), version=0, pyarrow_options={"filesystem": fs} - ) - - expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]}) - assert_frame_equal(expected, ldf.collect(), check_dtypes=False) - - def test_scan_delta_relative(delta_table_path: Path) -> None: rel_delta_table_path = str(delta_table_path / ".." / "delta-table") @@ -142,18 +130,6 @@ def test_read_delta_columns(delta_table_path: Path) -> None: assert_frame_equal(expected, df, check_dtypes=False) -def test_read_delta_filesystem(delta_table_path: Path) -> None: - raw_filesystem = pyarrow.fs.LocalFileSystem() - fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem) - - df = pl.read_delta( - str(delta_table_path), version=0, pyarrow_options={"filesystem": fs} - ) - - expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]}) - assert_frame_equal(expected, df, check_dtypes=False) - - def test_read_delta_relative(delta_table_path: Path) -> None: rel_delta_table_path = str(delta_table_path / ".." / "delta-table") From 11723c22c1a282671f3c9f5774dfb164b13b5692 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:12:23 +0200 Subject: [PATCH 02/10] chore: update msg --- py-polars/polars/io/delta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 8cc4a7b65fad..4b4cd982abb5 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -260,7 +260,7 @@ def scan_delta( ): msg = ( f"The table's minimum reader version is {table_protocol.min_reader_version} " - f"but deltalake only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}" + f"but polars delta scanner only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}" ) raise DeltaProtocolError(msg) if ( @@ -271,7 +271,7 @@ def scan_delta( SUPPORTED_READER_FEATURES ) if len(missing_features) > 0: - msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader." + msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner." raise DeltaProtocolError(msg) delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) From a051365819225eca345cfdc63261f8780c65e0f4 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:15:35 +0200 Subject: [PATCH 03/10] chore: use from_arrow --- py-polars/polars/io/delta.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 4b4cd982abb5..03f8b1953f72 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any from urllib.parse import urlparse -from polars.dataframe import DataFrame +from polars.convert import from_arrow from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake @@ -13,7 +13,7 @@ from polars.schema import Schema if TYPE_CHECKING: - from polars import DataType, LazyFrame + from polars import DataFrame, DataType, LazyFrame def read_delta( @@ -275,7 +275,7 @@ def scan_delta( raise DeltaProtocolError(msg) delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) - polars_schema = DataFrame(pa.Table.from_pylist([], delta_schema)).schema + polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore partition_columns = dl_tbl.metadata().partition_columns def _split_schema( From 699c4ffce8f6e331aa46190fbaebe7fa7229ac6c Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:18:22 +0200 Subject: [PATCH 04/10] chore: remove rechunk --- py-polars/tests/unit/io/test_delta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index 99c80589b81a..da4d58deaa05 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -490,5 +490,5 @@ def test_read_parquet_respects_rechunk_16982( df.write_delta(str(tmp_path), mode="append") rechunk, expected_chunks = rechunk_and_expected_chunks - result = pl.read_delta(str(tmp_path), rechunk=rechunk) + result = pl.read_delta(str(tmp_path)) assert result.n_chunks() == expected_chunks From 0103cdd8356cb7e4e0bc9865ac0b818a1fd321b8 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 5 Oct 2024 11:36:52 +0200 Subject: [PATCH 05/10] chore: keep deprecated pyarrow options behavior, issue warning --- py-polars/polars/io/delta.py | 56 +++++++++++++++++++++++++++ py-polars/tests/unit/io/test_delta.py | 23 +++-------- 2 files changed, 61 insertions(+), 18 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 03f8b1953f72..bd948e75623b 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -5,11 +5,13 @@ from typing import TYPE_CHECKING, Any from urllib.parse import urlparse +from polars._utils.deprecation import issue_deprecation_warning from polars.convert import from_arrow from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake from polars.io.parquet import scan_parquet +from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset from polars.schema import Schema if TYPE_CHECKING: @@ -21,8 +23,10 @@ def read_delta( *, version: int | str | datetime | None = None, columns: list[str] | None = None, + rechunk: bool | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + pyarrow_options: dict[str, Any] | None = None, ) -> DataFrame: """ Reads into a DataFrame from a Delta lake table. @@ -41,6 +45,12 @@ def read_delta( table is read. columns Columns to select. Accepts a list of column names. + rechunk + Make sure that all columns are contiguous in memory by + aggregating the chunks into a single array. + + .. deprecated:: 1.10.0 + Rechunk is automatically done in native reader storage_options Extra options for the storage backends supported by `deltalake`. For cloud storages, this may include configurations for authentication etc. @@ -49,6 +59,11 @@ def read_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + pyarrow_options + Keyword arguments while converting a Delta lake Table to pyarrow table. + + .. deprecated:: 1.10.0 + Remove pyarrow_options and use native polars filter, selection. Returns ------- @@ -120,6 +135,30 @@ def read_delta( ... table_path, delta_table_options=delta_table_options ... ) # doctest: +SKIP """ + if pyarrow_options is not None: + issue_deprecation_warning( + message="`pyarrow_options` are deprecated, polars native parquet reader is used when not passing pyarrow options.", + version="1.10", + ) + resolved_uri = _resolve_delta_lake_uri(source) + + dl_tbl = _get_delta_lake_table( + table_path=resolved_uri, + version=version, + storage_options=storage_options, + delta_table_options=delta_table_options, + ) + if rechunk is None: + rechunk = False + return from_arrow( + dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk + ) # type: ignore[return-value] + + if rechunk is not None: + issue_deprecation_warning( + message="`rechunk` is deprecated, this is automatically done now.", + version="1.10", + ) df = scan_delta( source=source, version=version, @@ -137,6 +176,7 @@ def scan_delta( version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + pyarrow_options: dict[str, Any] | None = None, ) -> LazyFrame: """ Lazily read from a Delta lake table. @@ -161,6 +201,13 @@ def scan_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + pyarrow_options + Keyword arguments while converting a Delta lake Table to pyarrow table. + Use this parameter when filtering on partitioned columns or to read + from a 'fsspec' supported filesystem. + + .. deprecated:: 1.10.0 + Remove pyarrow_options and use native polars filter, selection. Returns ------- @@ -245,6 +292,15 @@ def scan_delta( storage_options=storage_options, delta_table_options=delta_table_options, ) + + if pyarrow_options is not None: + issue_deprecation_warning( + message="PyArrow options are deprecated, polars native parquet scanner is used when not passing pyarrow options.", + version="1.10", + ) + pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options) + return scan_pyarrow_dataset(pa_ds) + import pyarrow as pa from deltalake.exceptions import DeltaProtocolError from deltalake.table import ( diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index da4d58deaa05..0276eec34525 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -184,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None: assert v1.columns == pl_df_1.columns assert df_supported.shape == pl_df_partitioned.shape - assert df_supported.columns == pl_df_partitioned.columns + assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns) assert tbl.version() == 1 assert partitioned_tbl.version() == 0 @@ -216,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None: assert partitioned_tbl.version() == 1 assert pl_df_partitioned.shape == (6, 14) # Rows are doubled - assert df_supported.columns == pl_df_partitioned.columns + assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns) df_supported.write_delta(partitioned_tbl_uri, mode="overwrite") @@ -470,25 +470,12 @@ def test_unsupported_dtypes(tmp_path: Path) -> None: df.write_delta(tmp_path / "time") +@pytest.mark.skip( + reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet" +) @pytest.mark.write_disk def test_categorical_becomes_string(tmp_path: Path) -> None: df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical}) df.write_delta(tmp_path) df2 = pl.read_delta(str(tmp_path)) assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8})) - - -@pytest.mark.write_disk -@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)]) -def test_read_parquet_respects_rechunk_16982( - rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path -) -> None: - # Create a delta lake table with 3 chunks: - df = pl.DataFrame({"a": [1]}) - df.write_delta(str(tmp_path)) - df.write_delta(str(tmp_path), mode="append") - df.write_delta(str(tmp_path), mode="append") - - rechunk, expected_chunks = rechunk_and_expected_chunks - result = pl.read_delta(str(tmp_path)) - assert result.n_chunks() == expected_chunks From e3ef45a70fa1c87ca7c2faeaf54cf7d33730f03d Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 5 Oct 2024 11:41:50 +0200 Subject: [PATCH 06/10] chore: ignore --- py-polars/polars/io/delta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index bd948e75623b..33b6f3140696 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -331,7 +331,7 @@ def scan_delta( raise DeltaProtocolError(msg) delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) - polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore + polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr] partition_columns = dl_tbl.metadata().partition_columns def _split_schema( From 3aa60c869d627dbc6e59b642836d69eed2f9ced9 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 2 Nov 2024 11:08:27 +0100 Subject: [PATCH 07/10] chore: fmt --- py-polars/tests/unit/io/test_delta.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index aeb349a054d8..d142c316c7d1 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -480,6 +480,7 @@ def test_categorical_becomes_string(tmp_path: Path) -> None: df2 = pl.read_delta(str(tmp_path)) assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8})) + def test_scan_delta_DT_input(delta_table_path: Path) -> None: DT = DeltaTable(str(delta_table_path), version=0) ldf = pl.scan_delta(DT) From b2c4276222e9bccf34ae829962b8e16f4185a971 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:57:44 +0100 Subject: [PATCH 08/10] chore: add use_pyarrow flag --- py-polars/polars/io/delta.py | 58 ++++++++++++++---------------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 40212d160a48..a77e1175e09b 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING, Any from urllib.parse import urlparse -from polars._utils.deprecation import issue_deprecation_warning from polars.convert import from_arrow from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes @@ -26,9 +25,10 @@ def read_delta( *, version: int | str | datetime | None = None, columns: list[str] | None = None, - rechunk: bool | None = None, + rechunk: bool = False, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, ) -> DataFrame: """ @@ -62,12 +62,11 @@ def read_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + use_pyarrow + Flag to enable pyarrow dataset reads. pyarrow_options Keyword arguments while converting a Delta lake Table to pyarrow table. - .. deprecated:: 1.10.0 - Remove pyarrow_options and use native polars filter, selection. - Returns ------- DataFrame @@ -138,33 +137,14 @@ def read_delta( ... table_path, delta_table_options=delta_table_options ... ) # doctest: +SKIP """ - if pyarrow_options is not None: - issue_deprecation_warning( - message="`pyarrow_options` are deprecated, polars native parquet reader is used when not passing pyarrow options.", - version="1.13", - ) - dl_tbl = _get_delta_lake_table( - table_path=source, - version=version, - storage_options=storage_options, - delta_table_options=delta_table_options, - ) - if rechunk is None: - rechunk = False - return from_arrow( - dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk - ) # type: ignore[return-value] - - if rechunk is not None: - issue_deprecation_warning( - message="`rechunk` is deprecated, this is automatically done now.", - version="1.13", - ) df = scan_delta( source=source, version=version, storage_options=storage_options, delta_table_options=delta_table_options, + use_pyarrow=use_pyarrow, + pyarrow_options=pyarrow_options, + rechunk=rechunk, ) if columns is not None: @@ -178,7 +158,9 @@ def scan_delta( version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, + rechunk: bool = False, ) -> LazyFrame: """ Lazily read from a Delta lake table. @@ -203,13 +185,15 @@ def scan_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + use_pyarrow + Flag to enable pyarrow dataset reads. pyarrow_options Keyword arguments while converting a Delta lake Table to pyarrow table. Use this parameter when filtering on partitioned columns or to read from a 'fsspec' supported filesystem. - - .. deprecated:: 1.10.0 - Remove pyarrow_options and use native polars filter, selection. + rechunk + Make sure that all columns are contiguous in memory by + aggregating the chunks into a single array. Returns ------- @@ -294,14 +278,15 @@ def scan_delta( delta_table_options=delta_table_options, ) - if pyarrow_options is not None: - issue_deprecation_warning( - message="PyArrow options are deprecated, polars native parquet scanner is used when not passing pyarrow options.", - version="1.13", - ) + if use_pyarrow: + pyarrow_options = pyarrow_options or {} pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options) return scan_pyarrow_dataset(pa_ds) + if pyarrow_options is not None: + msg = "To make use of pyarrow_options, set use_pyarrow to True" + raise ValueError(msg) + import pyarrow as pa from deltalake.exceptions import DeltaProtocolError from deltalake.table import ( @@ -331,6 +316,8 @@ def scan_delta( msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner." raise DeltaProtocolError(msg) + # Requires conversion through pyarrow table because there is no direct way yet to + # convert a delta schema into a polars schema delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr] partition_columns = dl_tbl.metadata().partition_columns @@ -361,6 +348,7 @@ def _split_schema( allow_missing_columns=True, hive_partitioning=len(partition_columns) > 0, storage_options=storage_options, + rechunk=rechunk, ) From 2ef113fd675ee55530a88f6fcb027ca80df808a8 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 13 Nov 2024 21:30:53 +0100 Subject: [PATCH 09/10] chore: typehint --- py-polars/polars/io/delta.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index a77e1175e09b..635989fb8714 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -25,7 +25,7 @@ def read_delta( *, version: int | str | datetime | None = None, columns: list[str] | None = None, - rechunk: bool = False, + rechunk: bool | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, use_pyarrow: bool = False, @@ -160,7 +160,7 @@ def scan_delta( delta_table_options: dict[str, Any] | None = None, use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, - rechunk: bool = False, + rechunk: bool | None = None, ) -> LazyFrame: """ Lazily read from a Delta lake table. @@ -348,7 +348,7 @@ def _split_schema( allow_missing_columns=True, hive_partitioning=len(partition_columns) > 0, storage_options=storage_options, - rechunk=rechunk, + rechunk=rechunk or False, ) From 6cdc5e79ed993974e8fa834c2d6b0c4ab74f3b44 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sat, 16 Nov 2024 18:20:52 +0100 Subject: [PATCH 10/10] remove deprecation hint --- py-polars/polars/io/delta.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index 635989fb8714..6e9ebc7c7433 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -51,9 +51,6 @@ def read_delta( rechunk Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. - - .. deprecated:: 1.10.0 - Rechunk is automatically done in native reader storage_options Extra options for the storage backends supported by `deltalake`. For cloud storages, this may include configurations for authentication etc.