Skip to content

Commit

Permalink
Fix sort_values simplification for {head|tail} (#811)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Hoefler <[email protected]>
  • Loading branch information
hendrikmakait and phofl authored Jan 29, 2024
1 parent 24e4e8b commit 16b7dcf
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 5 deletions.
4 changes: 2 additions & 2 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2281,15 +2281,15 @@ def sort_values(
"You passed %s" % str(by)
)

if not isinstance(ascending, bool) and self.npartitions > 1:
if not isinstance(ascending, bool):
# support [True] as input
if (
isinstance(ascending, list)
and len(ascending) == 1
and isinstance(ascending[0], bool)
):
ascending = ascending[0]
else:
elif self.npartitions > 1:
raise NotImplementedError(
f"Dask currently only supports a single boolean for ascending. You passed {str(ascending)}"
)
Expand Down
4 changes: 2 additions & 2 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ def _lower(self):
def _simplify_up(self, parent, dependents):
from dask_expr._expr import Filter, Head, Tail

if isinstance(parent, Head):
if self.frame.npartitions > 1 and isinstance(parent, Head):
if self.ascending:
if is_valid_nth_dtype(self._meta_by_dtype):
return NSmallest(self.frame, n=parent.n, _columns=self.by)
Expand All @@ -1022,7 +1022,7 @@ def _simplify_up(self, parent, dependents):
else:
return NLargestSlow(self.frame, n=parent.n, _columns=self.by)

if isinstance(parent, Tail):
if self.frame.npartitions > 1 and isinstance(parent, Tail):
if self.ascending:
if is_valid_nth_dtype(self._meta_by_dtype):
return NLargest(self.frame, n=parent.n, _columns=self.by)
Expand Down
154 changes: 153 additions & 1 deletion dask_expr/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_sort_values_descending(df, pdf):
)


def test_sort_head_nlargest(df):
def test_sort_head_nlargest(df, pdf):
a = df.sort_values("x", ascending=False).head(10, compute=False).expr
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name
Expand All @@ -372,6 +372,83 @@ def test_sort_head_nlargest(df):
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values("x", ascending=[False]).head(10, compute=False).expr
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values("x", ascending=[True]).head(10, compute=False).expr
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x"], ascending=[False]).head(10, compute=False).expr
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x"], ascending=[True]).head(10, compute=False).expr
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x", "y"], ascending=[False]).head(10, compute=False).expr
b = df.nlargest(10, columns=["x", "y"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x", "y"], ascending=[True]).head(10, compute=False).expr
b = df.nsmallest(10, columns=["x", "y"]).expr
assert a.optimize()._name == b.optimize()._name

with pytest.raises(
NotImplementedError,
match="Dask currently only supports a single boolean for ascending",
):
a = (
df.sort_values(["x", "y"], ascending=[False, True])
.head(10, compute=False)
.expr
)


def test_sort_single_partition_head_nlargest():
pdf = pd.DataFrame({"x": np.random.random(100), "y": np.random.random(100)})
df = from_pandas(pdf, npartitions=1)

result = df.sort_values("x", ascending=False).head(10)
expected = pdf.sort_values("x", ascending=False).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=True).head(10)
expected = pdf.sort_values("x", ascending=True).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=[False]).head(10)
expected = pdf.sort_values("x", ascending=[False]).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=[True]).head(10)
expected = pdf.sort_values("x", ascending=[True]).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x"], ascending=[True]).head(10)
expected = pdf.sort_values(["x"], ascending=[True]).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x"], ascending=[False]).head(10)
expected = pdf.sort_values(["x"], ascending=[False]).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[False]).head(10)
expected = pdf.sort_values(["x", "y"], ascending=False).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[True]).head(10)
expected = pdf.sort_values(["x", "y"], ascending=True).head(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[False, True]).head(10)
expected = pdf.sort_values(["x", "y"], ascending=[False, True]).head(10)
assert_eq(result, expected, sort_results=False)


def test_sort_tail_nsmallest(df, pdf):
a = df.sort_values("x", ascending=False).tail(10, compute=False).expr
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name
Expand All @@ -380,6 +457,81 @@ def test_sort_head_nlargest(df):
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values("x", ascending=[False]).tail(10, compute=False).expr
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values("x", ascending=[True]).tail(10, compute=False).expr
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x"], ascending=[False]).tail(10, compute=False).expr
b = df.nsmallest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x"], ascending=[True]).tail(10, compute=False).expr
b = df.nlargest(10, columns=["x"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x", "y"], ascending=[False]).tail(10, compute=False).expr
b = df.nsmallest(10, columns=["x", "y"]).expr
assert a.optimize()._name == b.optimize()._name

a = df.sort_values(["x", "y"], ascending=[True]).tail(10, compute=False).expr
b = df.nlargest(10, columns=["x", "y"]).expr
assert a.optimize()._name == b.optimize()._name

with pytest.raises(
NotImplementedError,
match="Dask currently only supports a single boolean for ascending",
):
a = (
df.sort_values(["x", "y"], ascending=[False, True])
.head(10, compute=False)
.expr
)


def test_sort_single_partition_tail():
pdf = pd.DataFrame({"x": np.random.random(100), "y": np.random.random(100)})
df = from_pandas(pdf, npartitions=1)

result = df.sort_values("x", ascending=False).tail(10)
expected = pdf.sort_values("x", ascending=False).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=True).tail(10)
expected = pdf.sort_values("x", ascending=True).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=[False]).tail(10)
expected = pdf.sort_values("x", ascending=[False]).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values("x", ascending=[True]).tail(10)
expected = pdf.sort_values("x", ascending=[True]).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x"], ascending=[False]).tail(10)
expected = pdf.sort_values(["x"], ascending=[False]).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x"], ascending=[True]).tail(10)
expected = pdf.sort_values(["x"], ascending=[True]).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[False]).tail(10)
expected = pdf.sort_values(["x", "y"], ascending=False).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[True]).tail(10)
expected = pdf.sort_values(["x", "y"], ascending=True).tail(10)
assert_eq(result, expected, sort_results=False)

result = df.sort_values(["x", "y"], ascending=[False, True]).tail(10)
expected = pdf.sort_values(["x", "y"], ascending=[False, True]).tail(10)
assert_eq(result, expected, sort_results=False)


@xfail_gpu("cudf udf support")
def test_sort_head_nlargest_string(pdf):
Expand Down

0 comments on commit 16b7dcf

Please sign in to comment.