Skip to content

Commit

Permalink
add to_backend in a few places
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 19, 2024
1 parent 9c8f216 commit b85336d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
6 changes: 6 additions & 0 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
if isinstance(x, dd.DataFrame):
# If input is a dask_cudf collection, convert
# to a pandas-backed Dask collection
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("pandas")
if cudf is None or not isinstance(x, dask_cudf.DataFrame):
# Already a Pandas-backed collection
return x
Expand All @@ -676,6 +679,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x
elif cudf and dask_cudf:
if isinstance(x, dd.DataFrame):
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("cudf")
# If input is a Dask collection, convert to dask_cudf
if isinstance(x, dask_cudf.DataFrame):
# Already a cudf-backed Dask collection
Expand Down
24 changes: 21 additions & 3 deletions merlin/io/dataframe_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ def to_ddf(self, columns=None, cpu=None):
cpu = self.cpu if cpu is None else cpu

# Move data from gpu to cpu if necessary
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
if cpu:
_ddf = self._ddf.to_backend("pandas")
elif cpu is False:
_ddf = self._ddf.to_backend("cudf")
else:
_ddf = self._ddf
else:
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf

if isinstance(columns, list):
return _ddf[columns]
Expand All @@ -49,14 +58,22 @@ def to_ddf(self, columns=None, cpu=None):
def to_cpu(self):
if self.cpu:
return
self._ddf = self._move_ddf("cpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("pandas")
else:
self._ddf = self._move_ddf("cpu")
self.cpu = True
self.moved_collection = not self.moved_collection

def to_gpu(self):
if not self.cpu:
return
self._ddf = self._move_ddf("gpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("cudf")
else:
self._ddf = self._move_ddf("gpu")
self.cpu = False
self.moved_collection = not self.moved_collection

Expand All @@ -66,6 +83,7 @@ def num_rows(self):

def _move_ddf(self, destination):
"""Move the collection between cpu and gpu memory."""
# TODO: Remove this method when we pin to dask>=2013.1.1
_ddf = self._ddf
if (
self.moved_collection
Expand Down

0 comments on commit b85336d

Please sign in to comment.