From e0123c3a49247ecbef6cdb2b21e80ef96ff2f6dc Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 19:48:13 +1000 Subject: [PATCH] c --- docs/_build/API_REFERENCE_LINKS.yml | 1 - docs/src/python/user-guide/io/hive.py | 2 +- docs/user-guide/io/hive.md | 2 +- py-polars/docs/source/reference/io.rst | 7 --- py-polars/polars/__init__.py | 2 - py-polars/polars/dataframe/frame.py | 36 +++++++++++----- py-polars/polars/io/__init__.py | 2 - py-polars/polars/io/hive.py | 59 -------------------------- py-polars/src/dataframe/io.rs | 16 ++++--- py-polars/src/io/hive.rs | 25 ----------- py-polars/src/io/mod.rs | 1 - py-polars/src/lib.rs | 2 - py-polars/tests/unit/io/test_hive.py | 10 ++--- 13 files changed, 41 insertions(+), 124 deletions(-) delete mode 100644 py-polars/polars/io/hive.py delete mode 100644 py-polars/src/io/hive.rs delete mode 100644 py-polars/src/io/mod.rs diff --git a/docs/_build/API_REFERENCE_LINKS.yml b/docs/_build/API_REFERENCE_LINKS.yml index 4762e8b8a5aa..4cd8c18a3b05 100644 --- a/docs/_build/API_REFERENCE_LINKS.yml +++ b/docs/_build/API_REFERENCE_LINKS.yml @@ -83,7 +83,6 @@ python: lazy: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.lazy.html explain: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.explain.html fetch: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.fetch.html - PartitionedWriteOptions: https://docs.pola.rs/api/python/stable/reference/api/polars.PartitionedWriteOptions.html SQLContext: https://docs.pola.rs/api/python/stable/reference/sql SQLregister: name: register diff --git a/docs/src/python/user-guide/io/hive.py b/docs/src/python/user-guide/io/hive.py index ef8885d2f53a..072b0fb3c34a 100644 --- a/docs/src/python/user-guide/io/hive.py +++ b/docs/src/python/user-guide/io/hive.py @@ -124,7 +124,7 @@ def dir_recurse(path: Path): # --8<-- [end:write_parquet_partitioned_show_data] # --8<-- [start:write_parquet_partitioned] -df.write_parquet(pl.PartitionedWriteOptions("docs/data/hive_write/", ["a", "b"])) +df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"]) # --8<-- [end:write_parquet_partitioned] # --8<-- [start:write_parquet_partitioned_show_paths] diff --git a/docs/user-guide/io/hive.md b/docs/user-guide/io/hive.md index 128810bdd0e8..d0de9ada2c59 100644 --- a/docs/user-guide/io/hive.md +++ b/docs/user-guide/io/hive.md @@ -88,7 +88,7 @@ For this example the following DataFrame is used: We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`: -{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet', 'PartitionedWriteOptions'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned" diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index babdcef0e1c8..1f088958a3c0 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -109,13 +109,6 @@ Parquet DataFrame.write_parquet LazyFrame.sink_parquet -Partitioned Writing -~~~~~~~~~~~~~~~~~~~ -.. autosummary:: - :toctree: api/ - - PartitionedWriteOptions - PyArrow Datasets ~~~~~~~~~~~~~~~~ Connect to pyarrow datasets. diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 682a575cadde..5931eb8c6c7d 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -149,7 +149,6 @@ zeros, ) from polars.io import ( - PartitionedWriteOptions, read_avro, read_clipboard, read_csv, @@ -239,7 +238,6 @@ "Unknown", "Utf8", # polars.io - "PartitionedWriteOptions", "read_avro", "read_clipboard", "read_csv", diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index d2ee9f87d982..355a8b937c4e 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -171,7 +171,6 @@ ) from polars._utils.various import NoDefault from polars.interchange.dataframe import PolarsDataFrame - from polars.io import PartitionedWriteOptions from polars.ml.torch import PolarsDataset if sys.version_info >= (3, 10): @@ -3438,7 +3437,7 @@ def write_ipc_stream( def write_parquet( self, - file: str | Path | BytesIO | PartitionedWriteOptions, + file: str | Path | BytesIO, *, compression: ParquetCompression = "zstd", compression_level: int | None = None, @@ -3447,6 +3446,8 @@ def write_parquet( data_page_size: int | None = None, use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, + partition_by: str | Sequence[str] | None = None, + partition_chunk_size_bytes: int = 4_294_967_296, ) -> None: """ Write to Apache Parquet file. @@ -3455,8 +3456,7 @@ def write_parquet( ---------- file File path or writable file-like object to which the result will be written. - This can also accept `PartitionedWriteOptions` to write a partitioned - dataset (however note this functionality is unstable). + This should be a path to a directory if writing a partitioned dataset. compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} Choose "zstd" for good compression performance. Choose "lz4" for fast compression/decompression. @@ -3500,6 +3500,14 @@ def write_parquet( using `pyarrow.parquet.write_to_dataset`. The `partition_cols` parameter leads to write the dataset to a directory. Similar to Spark's partitioned datasets. + partition_by + Column(s) to partition by. A partitioned dataset will be written if this is + specified. This parameter is considered unstable and is subject to change. + partition_chunk_size_bytes + Approximate size to split DataFrames within a single partition when + writing. Note this is calculated using the size of the DataFrame in + memory - the size of the output file may differ depending on the + file format / compression. Examples -------- @@ -3531,16 +3539,15 @@ def write_parquet( if compression is None: compression = "uncompressed" if isinstance(file, (str, Path)): - if pyarrow_options is not None and pyarrow_options.get("partition_cols"): + if ( + partition_by is not None + or pyarrow_options is not None + and pyarrow_options.get("partition_cols") + ): file = normalize_filepath(file, check_not_directory=False) else: file = normalize_filepath(file) - from polars.io import PartitionedWriteOptions - - if isinstance(file, PartitionedWriteOptions): - file = file._inner - if use_pyarrow: if statistics == "full" or isinstance(statistics, dict): msg = "write_parquet with `use_pyarrow=True` allows only boolean values for `statistics`" @@ -3602,6 +3609,13 @@ def write_parquet( "null_count": True, } + if partition_by is not None: + msg = "The `partition_by` parameter of `write_parquet` is considered unstable." + issue_unstable_warning(msg) + + if isinstance(partition_by, str): + partition_by = [partition_by] + self._df.write_parquet( file, compression, @@ -3609,6 +3623,8 @@ def write_parquet( statistics, row_group_size, data_page_size, + partition_by=partition_by, + partition_chunk_size_bytes=partition_chunk_size_bytes, ) def write_database( diff --git a/py-polars/polars/io/__init__.py b/py-polars/polars/io/__init__.py index de32a7763f95..9d63cc5a3780 100644 --- a/py-polars/polars/io/__init__.py +++ b/py-polars/polars/io/__init__.py @@ -5,7 +5,6 @@ from polars.io.csv import read_csv, read_csv_batched, scan_csv from polars.io.database import read_database, read_database_uri from polars.io.delta import read_delta, scan_delta -from polars.io.hive import PartitionedWriteOptions from polars.io.iceberg import scan_iceberg from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc from polars.io.json import read_json @@ -33,7 +32,6 @@ "read_parquet_schema", "scan_csv", "scan_delta", - "PartitionedWriteOptions", "scan_iceberg", "scan_ipc", "scan_ndjson", diff --git a/py-polars/polars/io/hive.py b/py-polars/polars/io/hive.py deleted file mode 100644 index b8d7231bf7b9..000000000000 --- a/py-polars/polars/io/hive.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Sequence - -from polars._utils.unstable import unstable -from polars._utils.various import ( - is_str_sequence, - normalize_filepath, -) - - -class PartitionedWriteOptions: - """ - Configuration for writing a partitioned dataset. - - This is passed to `write_*` functions that support writing partitioned datasets. - - .. warning:: - This functionality is considered **unstable**. It may be changed - at any point without it being considered a breaking change. - - Parameters - ---------- - path - Path to the base directory for the partitioned dataset. - partition_by - Column(s) to partition by. - chunk_size_bytes - Approximate size to split DataFrames within a single partition when - writing. Note this is calculated using the size of the DataFrame in - memory - the size of the output file may differ depending on the - file format / compression. - """ - - @unstable() - def __init__( - self, - path: str | Path, - partition_by: str | Sequence[str], - *, - chunk_size_bytes: int = 4_294_967_296, - ): - if not isinstance(path, (str, Path)): - msg = f"`path` should be of type str or Path, got {type(path).__name__!r}" - raise TypeError(msg) - - path = normalize_filepath(path, check_not_directory=False) - - if isinstance(partition_by, str): - partition_by = [partition_by] - - if not is_str_sequence(partition_by): - msg = f"`partition_by` should be of type str or Collection[str], got {type(path).__name__!r}" - raise TypeError(msg) - - from polars.polars import PartitionedWriteOptions - - self._inner = PartitionedWriteOptions(path, partition_by, chunk_size_bytes) diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index ccaa82282a77..e28a07a7d819 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -378,7 +378,7 @@ impl PyDataFrame { } #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size))] + #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes))] pub fn write_parquet( &mut self, py: Python, @@ -388,14 +388,16 @@ impl PyDataFrame { statistics: Wrap, row_group_size: Option, data_page_size: Option, + partition_by: Option>, + partition_chunk_size_bytes: usize, ) -> PyResult<()> { use polars_io::partition::write_partitioned_dataset; - use crate::io::hive::PartitionedWriteOptions; let compression = parse_parquet_compression(compression, compression_level)?; - if let Ok(part_opts) = py_f.downcast_bound::(py) { - let part_opts = part_opts.get(); + if let Some(partition_by) = partition_by { + let path = py_f.extract::(py)?; + py.allow_threads(|| { let write_options = ParquetWriteOptions { compression, @@ -406,10 +408,10 @@ impl PyDataFrame { }; write_partitioned_dataset( &self.df, - std::path::Path::new(AsRef::::as_ref(&part_opts.path)), - part_opts.partition_by.as_slice(), + std::path::Path::new(path.as_str()), + partition_by.as_slice(), &write_options, - part_opts.chunk_size_bytes, + partition_chunk_size_bytes, ) .map_err(PyPolarsErr::from) })?; diff --git a/py-polars/src/io/hive.rs b/py-polars/src/io/hive.rs deleted file mode 100644 index a835112d2d12..000000000000 --- a/py-polars/src/io/hive.rs +++ /dev/null @@ -1,25 +0,0 @@ -use pyo3::prelude::*; -use pyo3::pybacked::PyBackedStr; - -#[pyclass(frozen)] -pub struct PartitionedWriteOptions { - pub path: PyBackedStr, - pub partition_by: Vec, - pub chunk_size_bytes: usize, -} - -#[pymethods] -impl PartitionedWriteOptions { - #[new] - pub fn __init__( - path: PyBackedStr, - partition_by: Vec, - chunk_size_bytes: usize, - ) -> PyResult { - Ok(Self { - path, - partition_by, - chunk_size_bytes, - }) - } -} diff --git a/py-polars/src/io/mod.rs b/py-polars/src/io/mod.rs deleted file mode 100644 index 5ba92ac1f6c1..000000000000 --- a/py-polars/src/io/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hive; diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 99c4e489b90b..4e91dbb63e1f 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -23,7 +23,6 @@ mod file; mod functions; mod gil_once_cell; mod interop; -mod io; mod lazyframe; mod lazygroupby; mod map; @@ -126,7 +125,6 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { m.add_class::().unwrap(); #[cfg(feature = "sql")] m.add_class::().unwrap(); - m.add_class::().unwrap(); // Submodules // LogicalPlan objects diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 77d1b1506c40..7a7217e9cfba 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -672,7 +672,7 @@ def test_projection_only_hive_parts_gives_correct_number_of_rows( @pytest.mark.write_disk() def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None: root = tmp_path - df.write_parquet(pl.PartitionedWriteOptions(root, ["a", "b"])) + df.write_parquet(root, partition_by=["a", "b"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -693,9 +693,7 @@ def test_hive_write_multiple_files(tmp_path: Path) -> None: assert n_files > 1, "increase df size or decrease file size" root = tmp_path - df.write_parquet( - pl.PartitionedWriteOptions(root, ["a"], chunk_size_bytes=chunk_size) - ) + df.write_parquet(root, partition_by="a", partition_chunk_size_bytes=chunk_size) assert sum(1 for _ in (root / "a=0").iterdir()) == n_files assert_frame_equal(pl.scan_parquet(root).collect(), df) @@ -723,7 +721,7 @@ def test_hive_write_dates(tmp_path: Path) -> None: ) root = tmp_path - df.write_parquet(pl.PartitionedWriteOptions(root, ["date1", "date2"])) + df.write_parquet(root, partition_by=["date1", "date2"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -741,7 +739,7 @@ def test_hive_predicate_dates_14712( ) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet( - pl.PartitionedWriteOptions(tmp_path, ["a"]) + tmp_path, partition_by="a" ) pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect() assert "hive partitioning: skipped 1 files" in capfd.readouterr().err