Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add how="leftanti" support for cudf-backed merge #1073

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2818,7 +2818,7 @@ def merge(
Parameters
----------
right: dask.dataframe.DataFrame
how : {'left', 'right', 'outer', 'inner', 'leftsemi'}, default: 'inner'
how : {'left', 'right', 'outer', 'inner', 'leftsemi', 'leftanti'}, default: 'inner'
How to handle the operation of the two objects:

- left: use calling frame's index (or column if on is specified)
Expand All @@ -2832,6 +2832,9 @@ def merge(
- leftsemi: Choose all rows in left where the join keys can be found
in right. Won't duplicate rows if the keys are duplicated in right.
Drops all columns from right.
- leftanti: Choose all rows in left where the join keys cannot be found
in right. Won't duplicate rows if the keys are duplicated in right.
Drops all columns from right. Only supported with 'cudf' backend.

on : label or list
Column or index level names to join on. These must be found in both
Expand Down Expand Up @@ -5628,11 +5631,12 @@ def merge(
if on and not left_on and not right_on:
left_on = right_on = on

supported_how = ("left", "right", "outer", "inner", "leftsemi")
supported_how = ("left", "right", "outer", "inner", "leftsemi", "leftanti")
if how not in supported_how:
raise ValueError(
f"dask.dataframe.merge does not support how='{how}'."
f"Options are: {supported_how}."
"Note that 'leftanti' is only an dask_cudf option."
)

if how == "leftsemi":
Expand Down
12 changes: 7 additions & 5 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _filter_passthrough_available(self, parent, dependents):
if predicate_columns is None:
return False
if predicate_columns.issubset(self.left.columns):
return self.how in ("left", "inner", "leftsemi")
return self.how in ("left", "inner", "leftsemi", "leftanti")
elif predicate_columns.issubset(self.right.columns):
return self.how in ("right", "inner")
elif len(predicate_columns) > 0:
Expand Down Expand Up @@ -193,6 +193,8 @@ def _meta(self):
kwargs = self.kwargs.copy()
if kwargs["how"] == "leftsemi":
kwargs["how"] = "left"
if kwargs["how"] == "leftanti":
return make_meta(left)
return make_meta(left.merge(right, **kwargs))

@functools.cached_property
Expand Down Expand Up @@ -242,7 +244,7 @@ def _divisions(self):
elif (
use_left
and self.right.npartitions == 1
and self.how in ("inner", "left", "leftsemi")
and self.how in ("inner", "left", "leftsemi", "leftanti")
):
return self.left.divisions
else:
Expand Down Expand Up @@ -283,7 +285,7 @@ def is_broadcast_join(self):
s_method = self.shuffle_method or get_default_shuffle_method()
if (
s_method in ("tasks", "p2p")
and self.how in ("inner", "left", "right", "leftsemi")
and self.how in ("inner", "left", "right", "leftsemi", "leftanti")
and self.how != broadcast_side
and broadcast is not False
):
Expand All @@ -301,7 +303,7 @@ def _is_single_partition_broadcast(self):
or self.left.npartitions == 1
and self.how in ("right", "inner")
or self.right.npartitions == 1
and self.how in ("left", "inner", "leftsemi")
and self.how in ("left", "inner", "leftsemi", "leftanti")
)

@functools.cached_property
Expand Down Expand Up @@ -776,7 +778,7 @@ def _layer(self) -> dict:
),
(bcast_name, j),
]
if self.broadcast_side in ("left", "leftsemi"):
if self.broadcast_side in ("left", "leftsemi", "leftanti"):
_merge_args.reverse()

inter_key = (inter_name, part_out, j)
Expand Down
29 changes: 28 additions & 1 deletion dask_expr/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dask_expr._merge import BroadcastJoin
from dask_expr._shuffle import Shuffle
from dask_expr.io import FromPandas
from dask_expr.tests._util import _backend_library, assert_eq
from dask_expr.tests._util import _backend_library, _backend_name, assert_eq

# Set DataFrame backend for this module
pd = _backend_library()
Expand Down Expand Up @@ -996,6 +996,33 @@ def test_merge_leftsemi():
df1.merge(df2, how="leftsemi", on="aa")


@pytest.mark.xfail(
_backend_name() != "cudf", reason="leftanti joins not supported with pandas backend"
)
def test_merge_leftanti_cudf():
pdf1 = pd.DataFrame({"aa": [1, 2, 3, 4, 5, 6, 1, 2, 3], "bb": 1})
pdf2 = pd.DataFrame({"aa": [1, 2, 2, 4, 4, 10], "cc": 1})

df1 = from_pandas(pdf1, npartitions=2)
df2 = from_pandas(pdf2, npartitions=2)
assert_eq(
df1.merge(df2, how="leftanti"),
pdf1[~pdf1.aa.isin(pdf2.aa)],
check_index=False,
)
df2 = df2.rename(columns={"aa": "dd"})
assert_eq(
df1.merge(df2, how="leftanti", left_on="aa", right_on="dd"),
pdf1[~pdf1.aa.isin(pdf2.aa)],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we could just do this in merge_chunk for pandas data to support how="leftanti" for cpu as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, can look into this a bit more

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed some commits to dask/dask#11150 that, in conjunction with this PR, should unblock left anti/semi joins on CPU

check_index=False,
)
assert_eq(df1.merge(df2, how="leftanti"), pdf1[~pdf1.index.isin(pdf2.index)])

pdf2 = pdf2.set_index("aa")
df2 = from_pandas(pdf2, npartitions=2)
assert_eq(df1.merge(df2, how="leftanti", on="aa"), pdf1[~pdf1.aa.isin(pdf2.index)])


def test_merge_suffix_projections():
df = pd.DataFrame(
{
Expand Down
Loading