-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable basic p2p shuffle for dask-cudf #7743
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 21 files ±0 21 suites ±0 11h 25m 49s ⏱️ -52s For more details on these failures, see this check. Results for commit 50beefc. ± Comparison against base commit 3082d27. ♻️ This comment has been updated with latest results. |
I think most of the CI failures are not related and should've been fixed on main already. |
Do you want us to have a closer look? Is this intended to be merged at some point or is this just for show and tell? |
I don't think this is ready for a detailed review. However, I would welcome thoughts at any level. The immediate goal of this PR is to figure out a simple way to get the "p2p" shuffle working for cudf-backed data. That is, performance can be a secondary concern for now. Therefore, it would be good to know your thoughts on ways to relax the pandas-specific logic currently used in "p2p".
This started off as an experiment, but it would be extremely useful on the RAPIDs side to not have to worry about adding workarounds to avoid the "p2p" shuffle default. |
Would probably be good to add a test for this |
distributed/shuffle/_arrow.py
Outdated
@@ -45,6 +45,14 @@ def check_minimal_arrow_version() -> None: | |||
) | |||
|
|||
|
|||
def _arrow_to_df(table: pa.Table, like: Any) -> pd.DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The p2p code is good about using type hints. Unfortunately, it uses pd.DataFrame
in many places where we want something like pd.DataFrame | cudf.DataFrame
. The obvious problem is that I cannot just add the | cudf.DataFrame
, because there is no guarantee that cudf is installed (and we don't want to eagerly import it, even if it is installed).
What is the established type-hinting convention for cases like this? Do we need something like a run-time checkable BackendDataFrame
protocol? cc @jrbourbeau
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do this:
from typing import TYPE_CHECKING, Union
from typing_extensions import TypeAlias
import pandas as pd
DataFrameT: TypeAlias = Union[pd.DataFrame, "cudf.DataFrame"]
if TYPE_CHECKING:
import cudf
I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, but I guess mypy still complains because it can't find the import at type-checking time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you could also make this generic and say:
T = TypeVar("T")
def _arrow_to_df(table: pa.Table, like: T) -> T:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need something like a run-time checkable BackendDataFrame protocol?
The static-typing approach to this in python is PEP 544
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static-typing approach to this in python is PEP 544
Right exactly. This is why I asked if we should add a BackendDataFrame
protocol. Perhaps it makes sense to (eventually) replace is_dataframe_like
with a DataFrameLike
protocol in dask/dask?
FWIW, this would also be useful for dask-geopandas, to allow letting us have control over how a geopandas.GeoDataFrame gets converted to a pyarrow table and back (xref geopandas/dask-geopandas#256) |
Dask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @rjzamora!
Thanks for the review @hendrikmakait ! Let me know if there is anything left to do to get this merged (I don't think any of the CI failures are related to the changes in any way). |
Ready to merge from my perspective, when I reviewed it, this still had a [WIP] flag. |
Sorry, forgot about the |
I was not aware of that, thanks for pointing it out! |
This PR allows explicit `shuffle="p2p"` usage within the dask-cudf API now that dask/distributed#7743 is in. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Ray Douglass (https://github.com/raydouglass) - gpuCI (https://github.com/GPUtester) - Mike Wendt (https://github.com/mike-wendt) - AJ Schmidt (https://github.com/ajschmidt8) - GALI PREM SAGAR (https://github.com/galipremsagar) - Lawrence Mitchell (https://github.com/wence-) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #13893
These changes are meant to enable basic "p2p" functionality for
dask_cudf.DataFrame
collections by addressing some minor inconsistencies between pandas and cudf forDataFrame
<->pa.Table
conversion. This PR does not attempt to optimize the the p2p shuffle algorithm for GPU-backed data.TODO (in follow-up work):