diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index a87e2849..aef4c2fb 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -2818,7 +2818,7 @@ def merge( Parameters ---------- right: dask.dataframe.DataFrame - how : {'left', 'right', 'outer', 'inner', 'leftsemi'}, default: 'inner' + how : {'left', 'right', 'outer', 'inner', 'leftsemi', 'leftanti'}, default: 'inner' How to handle the operation of the two objects: - left: use calling frame's index (or column if on is specified) @@ -2832,6 +2832,9 @@ def merge( - leftsemi: Choose all rows in left where the join keys can be found in right. Won't duplicate rows if the keys are duplicated in right. Drops all columns from right. + - leftanti: Choose all rows in left where the join keys cannot be found + in right. Won't duplicate rows if the keys are duplicated in right. + Drops all columns from right. Only supported with 'cudf' backend. on : label or list Column or index level names to join on. These must be found in both @@ -5628,11 +5631,12 @@ def merge( if on and not left_on and not right_on: left_on = right_on = on - supported_how = ("left", "right", "outer", "inner", "leftsemi") + supported_how = ("left", "right", "outer", "inner", "leftsemi", "leftanti") if how not in supported_how: raise ValueError( f"dask.dataframe.merge does not support how='{how}'." f"Options are: {supported_how}." + "Note that 'leftanti' is only an dask_cudf option." ) if how == "leftsemi": diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index 4c4fb69d..698b849c 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -102,7 +102,7 @@ def _filter_passthrough_available(self, parent, dependents): if predicate_columns is None: return False if predicate_columns.issubset(self.left.columns): - return self.how in ("left", "inner", "leftsemi") + return self.how in ("left", "inner", "leftsemi", "leftanti") elif predicate_columns.issubset(self.right.columns): return self.how in ("right", "inner") elif len(predicate_columns) > 0: @@ -193,6 +193,8 @@ def _meta(self): kwargs = self.kwargs.copy() if kwargs["how"] == "leftsemi": kwargs["how"] = "left" + if kwargs["how"] == "leftanti": + return make_meta(left) return make_meta(left.merge(right, **kwargs)) @functools.cached_property @@ -242,7 +244,7 @@ def _divisions(self): elif ( use_left and self.right.npartitions == 1 - and self.how in ("inner", "left", "leftsemi") + and self.how in ("inner", "left", "leftsemi", "leftanti") ): return self.left.divisions else: @@ -283,7 +285,7 @@ def is_broadcast_join(self): s_method = self.shuffle_method or get_default_shuffle_method() if ( s_method in ("tasks", "p2p") - and self.how in ("inner", "left", "right", "leftsemi") + and self.how in ("inner", "left", "right", "leftsemi", "leftanti") and self.how != broadcast_side and broadcast is not False ): @@ -301,7 +303,7 @@ def _is_single_partition_broadcast(self): or self.left.npartitions == 1 and self.how in ("right", "inner") or self.right.npartitions == 1 - and self.how in ("left", "inner", "leftsemi") + and self.how in ("left", "inner", "leftsemi", "leftanti") ) @functools.cached_property @@ -776,7 +778,7 @@ def _layer(self) -> dict: ), (bcast_name, j), ] - if self.broadcast_side in ("left", "leftsemi"): + if self.broadcast_side in ("left", "leftsemi", "leftanti"): _merge_args.reverse() inter_key = (inter_name, part_out, j) diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index d16035df..acbcfe3e 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -8,7 +8,7 @@ from dask_expr._merge import BroadcastJoin from dask_expr._shuffle import Shuffle from dask_expr.io import FromPandas -from dask_expr.tests._util import _backend_library, assert_eq +from dask_expr.tests._util import _backend_library, _backend_name, assert_eq # Set DataFrame backend for this module pd = _backend_library() @@ -996,6 +996,33 @@ def test_merge_leftsemi(): df1.merge(df2, how="leftsemi", on="aa") +@pytest.mark.xfail( + _backend_name() != "cudf", reason="leftanti joins not supported with pandas backend" +) +def test_merge_leftanti_cudf(): + pdf1 = pd.DataFrame({"aa": [1, 2, 3, 4, 5, 6, 1, 2, 3], "bb": 1}) + pdf2 = pd.DataFrame({"aa": [1, 2, 2, 4, 4, 10], "cc": 1}) + + df1 = from_pandas(pdf1, npartitions=2) + df2 = from_pandas(pdf2, npartitions=2) + assert_eq( + df1.merge(df2, how="leftanti"), + pdf1[~pdf1.aa.isin(pdf2.aa)], + check_index=False, + ) + df2 = df2.rename(columns={"aa": "dd"}) + assert_eq( + df1.merge(df2, how="leftanti", left_on="aa", right_on="dd"), + pdf1[~pdf1.aa.isin(pdf2.aa)], + check_index=False, + ) + assert_eq(df1.merge(df2, how="leftanti"), pdf1[~pdf1.index.isin(pdf2.index)]) + + pdf2 = pdf2.set_index("aa") + df2 = from_pandas(pdf2, npartitions=2) + assert_eq(df1.merge(df2, how="leftanti", on="aa"), pdf1[~pdf1.aa.isin(pdf2.index)]) + + def test_merge_suffix_projections(): df = pd.DataFrame( {