Skip to content

Commit

Permalink
Allow explicit shuffle="p2p" within dask-cudf API (#13893)
Browse files Browse the repository at this point in the history
This PR allows explicit `shuffle="p2p"` usage within the dask-cudf API now that dask/distributed#7743 is in.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Ray Douglass (https://github.com/raydouglass)
  - gpuCI (https://github.com/GPUtester)
  - Mike Wendt (https://github.com/mike-wendt)
  - AJ Schmidt (https://github.com/ajschmidt8)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #13893
  • Loading branch information
rjzamora authored Sep 25, 2023
1 parent 036c07d commit ddd2b0d
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 14 deletions.
31 changes: 27 additions & 4 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,22 +373,37 @@ def percentile_cudf(a, q, interpolation="linear"):


@pyarrow_schema_dispatch.register((cudf.DataFrame,))
def _get_pyarrow_schema_cudf(obj, preserve_index=True, **kwargs):
def _get_pyarrow_schema_cudf(obj, preserve_index=None, **kwargs):
if kwargs:
warnings.warn(
"Ignoring the following arguments to "
f"`pyarrow_schema_dispatch`: {list(kwargs)}"
)
return meta_nonempty(obj).to_arrow(preserve_index=preserve_index).schema

return _cudf_to_table(
meta_nonempty(obj), preserve_index=preserve_index
).schema


@to_pyarrow_table_dispatch.register(cudf.DataFrame)
def _cudf_to_table(obj, preserve_index=True, **kwargs):
def _cudf_to_table(obj, preserve_index=None, **kwargs):
if kwargs:
warnings.warn(
"Ignoring the following arguments to "
f"`to_pyarrow_table_dispatch`: {list(kwargs)}"
)

# TODO: Remove this logic when cudf#14159 is resolved
# (see: https://github.com/rapidsai/cudf/issues/14159)
if preserve_index and isinstance(obj.index, cudf.RangeIndex):
obj = obj.copy()
obj.index.name = (
obj.index.name
if obj.index.name is not None
else "__index_level_0__"
)
obj.index = obj.index._as_int_index()

return obj.to_arrow(preserve_index=preserve_index)


Expand All @@ -401,7 +416,15 @@ def _table_to_cudf(obj, table, self_destruct=None, **kwargs):
f"Ignoring the following arguments to "
f"`from_pyarrow_table_dispatch`: {list(kwargs)}"
)
return obj.from_arrow(table)
result = obj.from_arrow(table)

# TODO: Remove this logic when cudf#14159 is resolved
# (see: https://github.com/rapidsai/cudf/issues/14159)
if "__index_level_0__" in result.index.names:
assert len(result.index.names) == 1
result.index.name = None

return result


@union_categoricals_dispatch.register((cudf.Series, cudf.BaseIndex))
Expand Down
26 changes: 19 additions & 7 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
import tlz as toolz

import dask
from dask import config
from dask.base import tokenize
from dask.dataframe import methods
from dask.dataframe.core import DataFrame, Index, Series
Expand All @@ -18,6 +18,8 @@
from cudf.api.types import is_categorical_dtype
from cudf.utils.utils import _dask_cudf_nvtx_annotate

_SHUFFLE_SUPPORT = ("tasks", "p2p") # "disk" not supported


@_dask_cudf_nvtx_annotate
def set_index_post(df, index_name, drop, column_dtype):
Expand Down Expand Up @@ -307,15 +309,25 @@ def sort_values(
return df4


def get_default_shuffle_method():
# Note that `dask.utils.get_default_shuffle_method`
# will return "p2p" by default when a distributed
# client is present. Dask-cudf supports "p2p", but
# will not use it by default (yet)
default = config.get("dataframe.shuffle.method", "tasks")
if default not in _SHUFFLE_SUPPORT:
default = "tasks"
return default


def _get_shuffle_type(shuffle):
# Utility to set the shuffle-kwarg default
# and to validate user-specified options.
# The only supported options is currently "tasks"
shuffle = shuffle or dask.config.get("shuffle", "tasks")
if shuffle != "tasks":
# and to validate user-specified options
shuffle = shuffle or get_default_shuffle_method()
if shuffle not in _SHUFFLE_SUPPORT:
raise ValueError(
f"Dask-cudf only supports in-memory shuffling with "
f"'tasks'. Got shuffle={shuffle}"
"Dask-cudf only supports the following shuffle "
f"methods: {_SHUFFLE_SUPPORT}. Got shuffle={shuffle}"
)

return shuffle
11 changes: 9 additions & 2 deletions python/dask_cudf/dask_cudf/tests/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,25 @@ def test_is_categorical_dispatch():
assert is_categorical_dtype(cudf.Index([1, 2, 3], dtype="category"))


def test_pyarrow_conversion_dispatch():
@pytest.mark.parametrize("preserve_index", [True, False])
def test_pyarrow_conversion_dispatch(preserve_index):
from dask.dataframe.dispatch import (
from_pyarrow_table_dispatch,
to_pyarrow_table_dispatch,
)

df1 = cudf.DataFrame(np.random.randn(10, 3), columns=list("abc"))
df2 = from_pyarrow_table_dispatch(df1, to_pyarrow_table_dispatch(df1))
df2 = from_pyarrow_table_dispatch(
df1, to_pyarrow_table_dispatch(df1, preserve_index=preserve_index)
)

assert type(df1) == type(df2)
assert_eq(df1, df2)

# Check that preserve_index does not produce a RangeIndex
if preserve_index:
assert not isinstance(df2.index, cudf.RangeIndex)


@pytest.mark.parametrize("index", [None, [1, 2] * 5])
def test_deterministic_tokenize(index):
Expand Down
22 changes: 21 additions & 1 deletion python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import numba.cuda
import pytest
Expand Down Expand Up @@ -77,3 +77,23 @@ def test_str_series_roundtrip():

actual = dask_series.compute()
assert_eq(actual, expected)


def test_p2p_shuffle():
# Check that we can use `shuffle="p2p"`
with dask_cuda.LocalCUDACluster(n_workers=1) as cluster:
with Client(cluster):
ddf = (
dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-08",
dtypes={"x": int},
)
.reset_index(drop=True)
.to_backend("cudf")
)
dd.assert_eq(
ddf.sort_values("x", shuffle="p2p").compute(),
ddf.compute().sort_values("x"),
check_index=False,
)

0 comments on commit ddd2b0d

Please sign in to comment.