Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update filter documentation for expressions #49309

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
15 changes: 13 additions & 2 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,11 @@ def filter(
stateful Ray actors. For more information, see
:ref:`Stateful Transforms <stateful_transforms>`.

.. tip::
If you can represent your filter as an expression that leverages Arrow
Dataset Expression, we will be do highly optimized filtering using native
Arrow interfaces.
srinathk10 marked this conversation as resolved.
Show resolved Hide resolved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the two below tips?

If you can represent your predicate with NumPy or pandas operations,
:meth:Dataset.map_batches might be faster. You can implement filter by
dropping rows.

If you're reading parquet files with :meth:ray.data.read_parquet,
and the filter is a simple predicate, you might
be able to speed it up by using filter pushdown; see
:ref:Parquet row pruning <parquet_row_pruning> for details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for "Parquet row pruning", let's remove the corresponding section from the performance tips user guide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seemed useful to me when I saw the tip and it's valid even now. Let me remove it and upload here. Will help with discussion if it's still valid.

.. tip::
If you can represent your predicate with NumPy or pandas operations,
:meth:`Dataset.map_batches` might be faster. You can implement filter by
Expand All @@ -1197,14 +1202,16 @@ def filter(
>>> ds = ray.data.range(100)
>>> ds.filter(lambda row: row["id"] % 2 == 0).take_all()
[{'id': 0}, {'id': 2}, {'id': 4}, ...]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove this since we don't want people using the fn parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given expr is limited, I thought, we can retain it. Let me remove this one.

>> ds.filter(expr="id <= 4").take_all()
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]

Time complexity: O(dataset size / parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth showing both UDF and expr based one and clearly call out that expr based one has very clear performance advantages (of skipping deserialization, etc)


Args:
fn: The predicate to apply to each row, or a class type
that can be instantiated to create such a callable.
expr: An expression string that will be
converted to pyarrow.dataset.Expression type.
expr: An expression string needs to be a valid python expression that
srinathk10 marked this conversation as resolved.
Show resolved Hide resolved
will be converted to pyarrow.dataset.Expression type.
srinathk10 marked this conversation as resolved.
Show resolved Hide resolved
compute: This argument is deprecated. Use ``concurrency`` argument.
concurrency: The number of Ray workers to use concurrently. For a
fixed-sized worker pool of size ``n``, specify ``concurrency=n``.
Expand Down Expand Up @@ -1237,6 +1244,10 @@ def filter(

compute = TaskPoolStrategy(size=concurrency)
else:
warnings.warn(
"Use expr instead of fn when possible for performant filters."
srinathk10 marked this conversation as resolved.
Show resolved Hide resolved
)

if callable(fn):
compute = get_compute_strategy(
fn=fn,
Expand Down
Loading