Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jul 12, 2024
1 parent 21114f8 commit e0123c3
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 124 deletions.
1 change: 0 additions & 1 deletion docs/_build/API_REFERENCE_LINKS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ python:
lazy: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.lazy.html
explain: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.explain.html
fetch: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.fetch.html
PartitionedWriteOptions: https://docs.pola.rs/api/python/stable/reference/api/polars.PartitionedWriteOptions.html
SQLContext: https://docs.pola.rs/api/python/stable/reference/sql
SQLregister:
name: register
Expand Down
2 changes: 1 addition & 1 deletion docs/src/python/user-guide/io/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def dir_recurse(path: Path):
# --8<-- [end:write_parquet_partitioned_show_data]

# --8<-- [start:write_parquet_partitioned]
df.write_parquet(pl.PartitionedWriteOptions("docs/data/hive_write/", ["a", "b"]))
df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"])
# --8<-- [end:write_parquet_partitioned]

# --8<-- [start:write_parquet_partitioned_show_paths]
Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/io/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ For this example the following DataFrame is used:

We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`:

{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet', 'PartitionedWriteOptions'])}}
{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}}

```python exec="on" result="text" session="user-guide/io/hive"
--8<-- "python/user-guide/io/hive.py:write_parquet_partitioned"
Expand Down
7 changes: 0 additions & 7 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ Parquet
DataFrame.write_parquet
LazyFrame.sink_parquet

Partitioned Writing
~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

PartitionedWriteOptions

PyArrow Datasets
~~~~~~~~~~~~~~~~
Connect to pyarrow datasets.
Expand Down
2 changes: 0 additions & 2 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
zeros,
)
from polars.io import (
PartitionedWriteOptions,
read_avro,
read_clipboard,
read_csv,
Expand Down Expand Up @@ -239,7 +238,6 @@
"Unknown",
"Utf8",
# polars.io
"PartitionedWriteOptions",
"read_avro",
"read_clipboard",
"read_csv",
Expand Down
36 changes: 26 additions & 10 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@
)
from polars._utils.various import NoDefault
from polars.interchange.dataframe import PolarsDataFrame
from polars.io import PartitionedWriteOptions
from polars.ml.torch import PolarsDataset

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -3438,7 +3437,7 @@ def write_ipc_stream(

def write_parquet(
self,
file: str | Path | BytesIO | PartitionedWriteOptions,
file: str | Path | BytesIO,
*,
compression: ParquetCompression = "zstd",
compression_level: int | None = None,
Expand All @@ -3447,6 +3446,8 @@ def write_parquet(
data_page_size: int | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
partition_by: str | Sequence[str] | None = None,
partition_chunk_size_bytes: int = 4_294_967_296,
) -> None:
"""
Write to Apache Parquet file.
Expand All @@ -3455,8 +3456,7 @@ def write_parquet(
----------
file
File path or writable file-like object to which the result will be written.
This can also accept `PartitionedWriteOptions` to write a partitioned
dataset (however note this functionality is unstable).
This should be a path to a directory if writing a partitioned dataset.
compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
Choose "zstd" for good compression performance.
Choose "lz4" for fast compression/decompression.
Expand Down Expand Up @@ -3500,6 +3500,14 @@ def write_parquet(
using `pyarrow.parquet.write_to_dataset`.
The `partition_cols` parameter leads to write the dataset to a directory.
Similar to Spark's partitioned datasets.
partition_by
Column(s) to partition by. A partitioned dataset will be written if this is
specified. This parameter is considered unstable and is subject to change.
partition_chunk_size_bytes
Approximate size to split DataFrames within a single partition when
writing. Note this is calculated using the size of the DataFrame in
memory - the size of the output file may differ depending on the
file format / compression.
Examples
--------
Expand Down Expand Up @@ -3531,16 +3539,15 @@ def write_parquet(
if compression is None:
compression = "uncompressed"
if isinstance(file, (str, Path)):
if pyarrow_options is not None and pyarrow_options.get("partition_cols"):
if (
partition_by is not None
or pyarrow_options is not None
and pyarrow_options.get("partition_cols")
):
file = normalize_filepath(file, check_not_directory=False)
else:
file = normalize_filepath(file)

from polars.io import PartitionedWriteOptions

if isinstance(file, PartitionedWriteOptions):
file = file._inner

if use_pyarrow:
if statistics == "full" or isinstance(statistics, dict):
msg = "write_parquet with `use_pyarrow=True` allows only boolean values for `statistics`"
Expand Down Expand Up @@ -3602,13 +3609,22 @@ def write_parquet(
"null_count": True,
}

if partition_by is not None:
msg = "The `partition_by` parameter of `write_parquet` is considered unstable."
issue_unstable_warning(msg)

if isinstance(partition_by, str):
partition_by = [partition_by]

self._df.write_parquet(
file,
compression,
compression_level,
statistics,
row_group_size,
data_page_size,
partition_by=partition_by,
partition_chunk_size_bytes=partition_chunk_size_bytes,
)

def write_database(
Expand Down
2 changes: 0 additions & 2 deletions py-polars/polars/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from polars.io.csv import read_csv, read_csv_batched, scan_csv
from polars.io.database import read_database, read_database_uri
from polars.io.delta import read_delta, scan_delta
from polars.io.hive import PartitionedWriteOptions
from polars.io.iceberg import scan_iceberg
from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc
from polars.io.json import read_json
Expand Down Expand Up @@ -33,7 +32,6 @@
"read_parquet_schema",
"scan_csv",
"scan_delta",
"PartitionedWriteOptions",
"scan_iceberg",
"scan_ipc",
"scan_ndjson",
Expand Down
59 changes: 0 additions & 59 deletions py-polars/polars/io/hive.py

This file was deleted.

16 changes: 9 additions & 7 deletions py-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl PyDataFrame {
}

#[cfg(feature = "parquet")]
#[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size))]
#[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes))]
pub fn write_parquet(
&mut self,
py: Python,
Expand All @@ -388,14 +388,16 @@ impl PyDataFrame {
statistics: Wrap<StatisticsOptions>,
row_group_size: Option<usize>,
data_page_size: Option<usize>,
partition_by: Option<Vec<String>>,
partition_chunk_size_bytes: usize,
) -> PyResult<()> {
use polars_io::partition::write_partitioned_dataset;

use crate::io::hive::PartitionedWriteOptions;
let compression = parse_parquet_compression(compression, compression_level)?;

if let Ok(part_opts) = py_f.downcast_bound::<PartitionedWriteOptions>(py) {
let part_opts = part_opts.get();
if let Some(partition_by) = partition_by {
let path = py_f.extract::<String>(py)?;

py.allow_threads(|| {
let write_options = ParquetWriteOptions {
compression,
Expand All @@ -406,10 +408,10 @@ impl PyDataFrame {
};
write_partitioned_dataset(
&self.df,
std::path::Path::new(AsRef::<str>::as_ref(&part_opts.path)),
part_opts.partition_by.as_slice(),
std::path::Path::new(path.as_str()),
partition_by.as_slice(),
&write_options,
part_opts.chunk_size_bytes,
partition_chunk_size_bytes,
)
.map_err(PyPolarsErr::from)
})?;
Expand Down
25 changes: 0 additions & 25 deletions py-polars/src/io/hive.rs

This file was deleted.

1 change: 0 additions & 1 deletion py-polars/src/io/mod.rs

This file was deleted.

2 changes: 0 additions & 2 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ mod file;
mod functions;
mod gil_once_cell;
mod interop;
mod io;
mod lazyframe;
mod lazygroupby;
mod map;
Expand Down Expand Up @@ -126,7 +125,6 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<PyBatchedCsv>().unwrap();
#[cfg(feature = "sql")]
m.add_class::<PySQLContext>().unwrap();
m.add_class::<io::hive::PartitionedWriteOptions>().unwrap();

// Submodules
// LogicalPlan objects
Expand Down
10 changes: 4 additions & 6 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def test_projection_only_hive_parts_gives_correct_number_of_rows(
@pytest.mark.write_disk()
def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None:
root = tmp_path
df.write_parquet(pl.PartitionedWriteOptions(root, ["a", "b"]))
df.write_parquet(root, partition_by=["a", "b"])

lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df)
Expand All @@ -693,9 +693,7 @@ def test_hive_write_multiple_files(tmp_path: Path) -> None:
assert n_files > 1, "increase df size or decrease file size"

root = tmp_path
df.write_parquet(
pl.PartitionedWriteOptions(root, ["a"], chunk_size_bytes=chunk_size)
)
df.write_parquet(root, partition_by="a", partition_chunk_size_bytes=chunk_size)

assert sum(1 for _ in (root / "a=0").iterdir()) == n_files
assert_frame_equal(pl.scan_parquet(root).collect(), df)
Expand Down Expand Up @@ -723,7 +721,7 @@ def test_hive_write_dates(tmp_path: Path) -> None:
)

root = tmp_path
df.write_parquet(pl.PartitionedWriteOptions(root, ["date1", "date2"]))
df.write_parquet(root, partition_by=["date1", "date2"])

lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df)
Expand All @@ -741,7 +739,7 @@ def test_hive_predicate_dates_14712(
) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet(
pl.PartitionedWriteOptions(tmp_path, ["a"])
tmp_path, partition_by="a"
)
pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect()
assert "hive partitioning: skipped 1 files" in capfd.readouterr().err

0 comments on commit e0123c3

Please sign in to comment.