Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update filter documentation for expressions #49309

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
9 changes: 4 additions & 5 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ To view the full list of supported file formats, see the

.. tip::

When reading parquet files, you can take advantage of column and row pruning
to efficiently filter columns and rows at the file scan level. See
:ref:`Parquet column pruning <parquet_column_pruning>` and
:ref:`Parquet row pruning <parquet_row_pruning>` for more details
on the projection and filter pushdown features.
When reading parquet files, you can take advantage of column pruning to
efficiently filter columns at the file scan level. See
:ref:`Parquet column pruning <parquet_column_pruning>` for more details
on the projection pushdown feature.

.. tab-item:: Images

Expand Down
23 changes: 0 additions & 23 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,29 +215,6 @@ calling :func:`~ray.data.Dataset.select_columns`, since column selection is push

Dataset(num_rows=150, schema={sepal.length: double, variety: string})

.. _parquet_row_pruning:

Parquet row pruning (filter pushdown)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Similar to Parquet column pruning, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown)
which is applied at the file scan so only rows that match the filter predicate
are returned. This can be used in conjunction with column pruning when appropriate to get the benefits of both.

.. testcode::

import ray
# Only read rows with `sepal.length` greater than 5.0.
# The row count will be less than the total number of rows (150) in the full dataset.
ray.data.read_parquet(
"s3://anonymous@ray-example-data/iris.parquet",
filter=pyarrow.dataset.field("sepal.length") > 5.0,
).count()

.. testoutput::

118


.. _data_memory:

Expand Down
23 changes: 10 additions & 13 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,30 +1185,23 @@ def filter(
:ref:`Stateful Transforms <stateful_transforms>`.

.. tip::
If you can represent your predicate with NumPy or pandas operations,
:meth:`Dataset.map_batches` might be faster. You can implement filter by
dropping rows.

.. tip::
If you're reading parquet files with :meth:`ray.data.read_parquet`,
and the filter is a simple predicate, you might
be able to speed it up by using filter pushdown; see
:ref:`Parquet row pruning <parquet_row_pruning>` for details.
If you use the `expr` parameter with a Python expression string, Ray Data
optimizes your filter with native Arrow interfaces.

Examples:

>>> import ray
>>> ds = ray.data.range(100)
>>> ds.filter(lambda row: row["id"] % 2 == 0).take_all()
[{'id': 0}, {'id': 2}, {'id': 4}, ...]
>>> ds.filter(expr="id <= 4").take_all()
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]

Time complexity: O(dataset size / parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth showing both UDF and expr based one and clearly call out that expr based one has very clear performance advantages (of skipping deserialization, etc)


Args:
fn: The predicate to apply to each row, or a class type
that can be instantiated to create such a callable.
expr: An expression string that will be
converted to pyarrow.dataset.Expression type.
expr: An expression string needs to be a valid Python expression that
will be converted to ``pyarrow.dataset.Expression`` type.
compute: This argument is deprecated. Use ``concurrency`` argument.
concurrency: The number of Ray workers to use concurrently. For a
fixed-sized worker pool of size ``n``, specify ``concurrency=n``.
Expand Down Expand Up @@ -1242,6 +1235,10 @@ def filter(

compute = TaskPoolStrategy(size=concurrency)
else:
warnings.warn(
"Use 'expr' instead of 'fn' when possible for performant filters."
)

if callable(fn):
compute = get_compute_strategy(
fn=fn,
Expand Down
Loading