From bd7057820ee9af9f269fa01778ed2e03db355691 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Aug 2024 14:01:17 -0700 Subject: [PATCH 1/5] add ToCudfBackend --- python/dask_cudf/dask_cudf/backends.py | 14 +++++------ python/dask_cudf/dask_cudf/expr/_expr.py | 24 ++++++++++++++++++- python/dask_cudf/dask_cudf/tests/test_core.py | 16 ++++++++++++- python/dask_cudf/dask_cudf/tests/utils.py | 4 ++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 01bab30190a..cde9f6ad260 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -631,20 +631,20 @@ class CudfDXBackendEntrypoint(DataFrameBackendEntrypoint): Examples -------- >>> import dask - >>> import dask_expr + >>> import dask_expr as dx >>> with dask.config.set({"dataframe.backend": "cudf"}): ... ddf = dx.from_dict({"a": range(10)}) >>> type(ddf._meta) """ - @classmethod - def to_backend_dispatch(cls): - return CudfBackendEntrypoint.to_backend_dispatch() + @staticmethod + def to_backend(data, **kwargs): + import dask_expr as dx - @classmethod - def to_backend(cls, *args, **kwargs): - return CudfBackendEntrypoint.to_backend(*args, **kwargs) + from dask_cudf.expr._expr import ToCudfBackend + + return dx.new_collection(ToCudfBackend(data, kwargs)) @staticmethod def from_dict( diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 8fccaccb695..b1b99dd7d53 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -4,12 +4,34 @@ import dask_expr._shuffle as _shuffle_module from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise -from dask_expr._expr import Expr, VarColumns +from dask_expr._expr import Expr, ToBackend, VarColumns from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty from dask.dataframe.dispatch import is_categorical_dtype +import cudf + +## +## Custom expressions +## + + +class ToCudfBackend(ToBackend): + @staticmethod + def operation(df, options): + from dask_cudf.backends import to_cudf_dispatch + + return to_cudf_dispatch(df, **options) + + def _simplify_down(self): + if isinstance( + self.frame._meta, (cudf.DataFrame, cudf.Series, cudf.Index) + ): + # We already have cudf data + return self.frame + + ## ## Custom expression patching ## diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 174923c2c7e..9b56d6b2ed2 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -15,7 +15,11 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr +from dask_cudf.tests.utils import ( + require_dask_expr, + skip_dask_expr, + xfail_dask_expr, +) def test_from_dict_backend_dispatch(): @@ -993,3 +997,13 @@ def test_series_isin_error(): ser.isin([1, 5, "a"]) with pytest.raises(TypeError): ddf.isin([1, 5, "a"]).compute() + + +@require_dask_expr() +def test_to_backend_simplify(): + # Check that column projection is not blocked by to_backend + with dask.config.set({"dataframe.backend": "pandas"}): + df = dd.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]}, npartitions=2) + df2 = df.to_backend("cudf")[["y"]].simplify() + df3 = df[["y"]].to_backend("cudf").simplify() + assert df2._name == df3._name diff --git a/python/dask_cudf/dask_cudf/tests/utils.py b/python/dask_cudf/dask_cudf/tests/utils.py index c7dedbb6b4a..cc0c6899804 100644 --- a/python/dask_cudf/dask_cudf/tests/utils.py +++ b/python/dask_cudf/dask_cudf/tests/utils.py @@ -48,3 +48,7 @@ def xfail_dask_expr(reason=_default_reason, lt_version=None): else: xfail = QUERY_PLANNING_ON return pytest.mark.xfail(xfail, reason=reason) + + +def require_dask_expr(reason="requires dask-expr"): + return pytest.mark.skipif(not QUERY_PLANNING_ON, reason=reason) From 2cc9fc2fd0a295b04d5110fdec5c724130c6f545 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Aug 2024 14:03:08 -0700 Subject: [PATCH 2/5] test stacking --- python/dask_cudf/dask_cudf/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 9b56d6b2ed2..905d8c08135 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -1005,5 +1005,5 @@ def test_to_backend_simplify(): with dask.config.set({"dataframe.backend": "pandas"}): df = dd.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]}, npartitions=2) df2 = df.to_backend("cudf")[["y"]].simplify() - df3 = df[["y"]].to_backend("cudf").simplify() + df3 = df[["y"]].to_backend("cudf").to_backend("cudf").simplify() assert df2._name == df3._name From 0321d26529c1646098129ac55f92f96dfa04cb8a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Aug 2024 14:11:16 -0700 Subject: [PATCH 3/5] add missing to_cudf_dispatch_from_cudf --- python/dask_cudf/dask_cudf/backends.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index cde9f6ad260..a521c26db5e 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -526,6 +526,11 @@ def to_cudf_dispatch_from_pandas(data, nan_as_null=None, **kwargs): return cudf.from_pandas(data, nan_as_null=nan_as_null) +@to_cudf_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index)) +def to_cudf_dispatch_from_cudf(data, **kwargs): + return data + + # Define "cudf" backend engine to be registered with Dask class CudfBackendEntrypoint(DataFrameBackendEntrypoint): """Backend-entrypoint class for Dask-DataFrame From f31a8b00f6bf7a54194fc1600565d8d964870667 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Aug 2024 14:23:35 -0700 Subject: [PATCH 4/5] backward compat --- python/dask_cudf/dask_cudf/expr/_expr.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index b1b99dd7d53..8a2c50d3fe7 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -4,7 +4,7 @@ import dask_expr._shuffle as _shuffle_module from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise -from dask_expr._expr import Expr, ToBackend, VarColumns +from dask_expr._expr import Elemwise, Expr, VarColumns from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty @@ -17,7 +17,14 @@ ## -class ToCudfBackend(ToBackend): +class ToCudfBackend(Elemwise): + # TODO: Inherit from ToBackend when rapids-dask-dependency + # is pinned to dask>=2024.8.1 + _parameters = ["frame", "options"] + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True + @staticmethod def operation(df, options): from dask_cudf.backends import to_cudf_dispatch From 732927957f99feb66f1848d644306f19900d26ea Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Aug 2024 15:09:25 -0700 Subject: [PATCH 5/5] fix unsupport kwarg failure --- python/dask_cudf/dask_cudf/backends.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 8d6b05a2cf0..79bef03761e 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -520,6 +520,7 @@ def to_cudf_dispatch_from_pandas(data, nan_as_null=None, **kwargs): @to_cudf_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index)) def to_cudf_dispatch_from_cudf(data, **kwargs): + _unsupported_kwargs("cudf", "cudf", kwargs) return data