diff --git a/doc/source/data/loading-data.rst b/doc/source/data/loading-data.rst index b390e6e6be35..aaada9f8e0c3 100644 --- a/doc/source/data/loading-data.rst +++ b/doc/source/data/loading-data.rst @@ -45,11 +45,10 @@ To view the full list of supported file formats, see the .. tip:: - When reading parquet files, you can take advantage of column and row pruning - to efficiently filter columns and rows at the file scan level. See - :ref:`Parquet column pruning ` and - :ref:`Parquet row pruning ` for more details - on the projection and filter pushdown features. + When reading parquet files, you can take advantage of column pruning to + efficiently filter columns at the file scan level. See + :ref:`Parquet column pruning ` for more details + on the projection pushdown feature. .. tab-item:: Images diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 03b5f4100894..b496e4fbdc77 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -215,29 +215,6 @@ calling :func:`~ray.data.Dataset.select_columns`, since column selection is push Dataset(num_rows=150, schema={sepal.length: double, variety: string}) -.. _parquet_row_pruning: - -Parquet row pruning (filter pushdown) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Similar to Parquet column pruning, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown) -which is applied at the file scan so only rows that match the filter predicate -are returned. This can be used in conjunction with column pruning when appropriate to get the benefits of both. - -.. testcode:: - - import ray - # Only read rows with `sepal.length` greater than 5.0. - # The row count will be less than the total number of rows (150) in the full dataset. - ray.data.read_parquet( - "s3://anonymous@ray-example-data/iris.parquet", - filter=pyarrow.dataset.field("sepal.length") > 5.0, - ).count() - -.. testoutput:: - - 118 - .. _data_memory: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index c2aabb51b785..331119fff4f0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1185,30 +1185,23 @@ def filter( :ref:`Stateful Transforms `. .. tip:: - If you can represent your predicate with NumPy or pandas operations, - :meth:`Dataset.map_batches` might be faster. You can implement filter by - dropping rows. - - .. tip:: - If you're reading parquet files with :meth:`ray.data.read_parquet`, - and the filter is a simple predicate, you might - be able to speed it up by using filter pushdown; see - :ref:`Parquet row pruning ` for details. + If you use the `expr` parameter with a Python expression string, Ray Data + optimizes your filter with native Arrow interfaces. Examples: >>> import ray >>> ds = ray.data.range(100) - >>> ds.filter(lambda row: row["id"] % 2 == 0).take_all() - [{'id': 0}, {'id': 2}, {'id': 4}, ...] + >>> ds.filter(expr="id <= 4").take_all() + [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}] Time complexity: O(dataset size / parallelism) Args: fn: The predicate to apply to each row, or a class type that can be instantiated to create such a callable. - expr: An expression string that will be - converted to pyarrow.dataset.Expression type. + expr: An expression string needs to be a valid Python expression that + will be converted to ``pyarrow.dataset.Expression`` type. compute: This argument is deprecated. Use ``concurrency`` argument. concurrency: The number of Ray workers to use concurrently. For a fixed-sized worker pool of size ``n``, specify ``concurrency=n``. @@ -1242,6 +1235,10 @@ def filter( compute = TaskPoolStrategy(size=concurrency) else: + warnings.warn( + "Use 'expr' instead of 'fn' when possible for performant filters." + ) + if callable(fn): compute = get_compute_strategy( fn=fn,