Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register read_parquet and read_csv as "dispatchable" #1114

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import numpy as np
import pandas as pd
import pyarrow as pa
from dask import compute, config, get_annotations
from dask import compute, get_annotations
from dask.array import Array
from dask.base import DaskMethodsMixin, is_dask_collection, named_schedulers
from dask.core import flatten
Expand Down Expand Up @@ -5099,6 +5099,7 @@ def from_dask_array(x, columns=None, index=None, meta=None):
return from_legacy_dataframe(df, optimize=True)


@dataframe_creation_dispatch.register_inplace("pandas")
def read_csv(
path,
*args,
Expand All @@ -5109,7 +5110,6 @@ def read_csv(
):
from dask_expr.io.csv import ReadCSV

dataframe_backend = config.get("dataframe.backend", "pandas")
if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
Expand All @@ -5119,7 +5119,7 @@ def read_csv(
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend=dataframe_backend,
dataframe_backend="pandas",
)
)

Expand Down Expand Up @@ -5174,6 +5174,7 @@ def read_fwf(
)


@dataframe_creation_dispatch.register_inplace("pandas")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the cudf backend, dask-cudf can register a simple function that calls back into this function with engine=CudfEngine (already confirmed that this will work fine). The dask-cudf logic can also check the filesystem argument and do something like read_parquet(..., filesystem="arrow").to_backend("cudf") (since filesystem="arrow" is currently broken in dask-expr when the backend is "cudf")

def read_parquet(
path=None,
columns=None,
Expand Down
Loading