diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index ac328348..8fb767b7 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -2281,7 +2281,7 @@ def sort_values( "You passed %s" % str(by) ) - if not isinstance(ascending, bool) and self.npartitions > 1: + if not isinstance(ascending, bool): # support [True] as input if ( isinstance(ascending, list) @@ -2289,7 +2289,7 @@ def sort_values( and isinstance(ascending[0], bool) ): ascending = ascending[0] - else: + elif self.npartitions > 1: raise NotImplementedError( f"Dask currently only supports a single boolean for ascending. You passed {str(ascending)}" ) diff --git a/dask_expr/_shuffle.py b/dask_expr/_shuffle.py index e0d1acaa..61b2437d 100644 --- a/dask_expr/_shuffle.py +++ b/dask_expr/_shuffle.py @@ -1010,7 +1010,7 @@ def _lower(self): def _simplify_up(self, parent, dependents): from dask_expr._expr import Filter, Head, Tail - if isinstance(parent, Head): + if self.frame.npartitions > 1 and isinstance(parent, Head): if self.ascending: if is_valid_nth_dtype(self._meta_by_dtype): return NSmallest(self.frame, n=parent.n, _columns=self.by) @@ -1022,7 +1022,7 @@ def _simplify_up(self, parent, dependents): else: return NLargestSlow(self.frame, n=parent.n, _columns=self.by) - if isinstance(parent, Tail): + if self.frame.npartitions > 1 and isinstance(parent, Tail): if self.ascending: if is_valid_nth_dtype(self._meta_by_dtype): return NLargest(self.frame, n=parent.n, _columns=self.by) diff --git a/dask_expr/tests/test_shuffle.py b/dask_expr/tests/test_shuffle.py index 4a52bc8f..d581cb77 100644 --- a/dask_expr/tests/test_shuffle.py +++ b/dask_expr/tests/test_shuffle.py @@ -363,7 +363,7 @@ def test_sort_values_descending(df, pdf): ) -def test_sort_head_nlargest(df): +def test_sort_head_nlargest(df, pdf): a = df.sort_values("x", ascending=False).head(10, compute=False).expr b = df.nlargest(10, columns=["x"]).expr assert a.optimize()._name == b.optimize()._name @@ -372,6 +372,83 @@ def test_sort_head_nlargest(df): b = df.nsmallest(10, columns=["x"]).expr assert a.optimize()._name == b.optimize()._name + a = df.sort_values("x", ascending=[False]).head(10, compute=False).expr + b = df.nlargest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values("x", ascending=[True]).head(10, compute=False).expr + b = df.nsmallest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x"], ascending=[False]).head(10, compute=False).expr + b = df.nlargest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x"], ascending=[True]).head(10, compute=False).expr + b = df.nsmallest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x", "y"], ascending=[False]).head(10, compute=False).expr + b = df.nlargest(10, columns=["x", "y"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x", "y"], ascending=[True]).head(10, compute=False).expr + b = df.nsmallest(10, columns=["x", "y"]).expr + assert a.optimize()._name == b.optimize()._name + + with pytest.raises( + NotImplementedError, + match="Dask currently only supports a single boolean for ascending", + ): + a = ( + df.sort_values(["x", "y"], ascending=[False, True]) + .head(10, compute=False) + .expr + ) + + +def test_sort_single_partition_head_nlargest(): + pdf = pd.DataFrame({"x": np.random.random(100), "y": np.random.random(100)}) + df = from_pandas(pdf, npartitions=1) + + result = df.sort_values("x", ascending=False).head(10) + expected = pdf.sort_values("x", ascending=False).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=True).head(10) + expected = pdf.sort_values("x", ascending=True).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=[False]).head(10) + expected = pdf.sort_values("x", ascending=[False]).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=[True]).head(10) + expected = pdf.sort_values("x", ascending=[True]).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x"], ascending=[True]).head(10) + expected = pdf.sort_values(["x"], ascending=[True]).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x"], ascending=[False]).head(10) + expected = pdf.sort_values(["x"], ascending=[False]).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[False]).head(10) + expected = pdf.sort_values(["x", "y"], ascending=False).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[True]).head(10) + expected = pdf.sort_values(["x", "y"], ascending=True).head(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[False, True]).head(10) + expected = pdf.sort_values(["x", "y"], ascending=[False, True]).head(10) + assert_eq(result, expected, sort_results=False) + + +def test_sort_tail_nsmallest(df, pdf): a = df.sort_values("x", ascending=False).tail(10, compute=False).expr b = df.nsmallest(10, columns=["x"]).expr assert a.optimize()._name == b.optimize()._name @@ -380,6 +457,81 @@ def test_sort_head_nlargest(df): b = df.nlargest(10, columns=["x"]).expr assert a.optimize()._name == b.optimize()._name + a = df.sort_values("x", ascending=[False]).tail(10, compute=False).expr + b = df.nsmallest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values("x", ascending=[True]).tail(10, compute=False).expr + b = df.nlargest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x"], ascending=[False]).tail(10, compute=False).expr + b = df.nsmallest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x"], ascending=[True]).tail(10, compute=False).expr + b = df.nlargest(10, columns=["x"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x", "y"], ascending=[False]).tail(10, compute=False).expr + b = df.nsmallest(10, columns=["x", "y"]).expr + assert a.optimize()._name == b.optimize()._name + + a = df.sort_values(["x", "y"], ascending=[True]).tail(10, compute=False).expr + b = df.nlargest(10, columns=["x", "y"]).expr + assert a.optimize()._name == b.optimize()._name + + with pytest.raises( + NotImplementedError, + match="Dask currently only supports a single boolean for ascending", + ): + a = ( + df.sort_values(["x", "y"], ascending=[False, True]) + .head(10, compute=False) + .expr + ) + + +def test_sort_single_partition_tail(): + pdf = pd.DataFrame({"x": np.random.random(100), "y": np.random.random(100)}) + df = from_pandas(pdf, npartitions=1) + + result = df.sort_values("x", ascending=False).tail(10) + expected = pdf.sort_values("x", ascending=False).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=True).tail(10) + expected = pdf.sort_values("x", ascending=True).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=[False]).tail(10) + expected = pdf.sort_values("x", ascending=[False]).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values("x", ascending=[True]).tail(10) + expected = pdf.sort_values("x", ascending=[True]).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x"], ascending=[False]).tail(10) + expected = pdf.sort_values(["x"], ascending=[False]).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x"], ascending=[True]).tail(10) + expected = pdf.sort_values(["x"], ascending=[True]).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[False]).tail(10) + expected = pdf.sort_values(["x", "y"], ascending=False).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[True]).tail(10) + expected = pdf.sort_values(["x", "y"], ascending=True).tail(10) + assert_eq(result, expected, sort_results=False) + + result = df.sort_values(["x", "y"], ascending=[False, True]).tail(10) + expected = pdf.sort_values(["x", "y"], ascending=[False, True]).tail(10) + assert_eq(result, expected, sort_results=False) + @xfail_gpu("cudf udf support") def test_sort_head_nlargest_string(pdf):