Skip to content

Commit

Permalink
Disable spilling for aggregations over distinct or sorted inputs (#7460)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #7460

Disabling spilling for aggregations with distinct and sorting,
because these aren't supported yet.

Reviewed By: xiaoxmeng

Differential Revision: D51091567

fbshipit-source-id: 4f2284f8e07ba47dcd302657cc580ca85335fb0e
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed Nov 8, 2023
1 parent 703f2f2 commit aec77a5
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
15 changes: 15 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ void addSortingKeys(
}
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
// TODO: add spilling for aggregations with sorting or with distinct later:
// https://github.com/facebookincubator/velox/issues/7454
// https://github.com/facebookincubator/velox/issues/7455
for (const auto& aggregate : aggregates_) {
if (aggregate.distinct || !aggregate.sortingKeys.empty()) {
return false;
}
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
}

void AggregationNode::addDetails(std::stringstream& stream) const {
stream << stepName(step_) << " ";

Expand Down
7 changes: 1 addition & 6 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,7 @@ class AggregationNode : public PlanNode {
return "Aggregation";
}

bool canSpill(const QueryConfig& queryConfig) const override {
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
}
bool canSpill(const QueryConfig& queryConfig) const override;

bool isFinal() const {
return step_ == Step::kFinal;
Expand Down
53 changes: 53 additions & 0 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,59 @@ TEST_F(AggregationTest, distinctWithSpilling) {
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(AggregationTest, spillingForAggrsWithDistinct) {
auto vectors = makeVectors(rowType_, 100, 10);
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c1"}, {"count(DISTINCT c0)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults("SELECT c1, count(DISTINCT c0) FROM tmp GROUP BY c1");
// Verify that spilling is not triggered.
const auto& queryConfig = task->queryCtx()->queryConfig();
ASSERT_TRUE(queryConfig.spillEnabled());
ASSERT_TRUE(queryConfig.aggregationSpillEnabled());
ASSERT_EQ(100, queryConfig.testingSpillPct());
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(AggregationTest, spillingForAggrsWithSorting) {
auto vectors = makeVectors(rowType_, 100, 10);
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c1"}, {"count(c0 ORDER BY c2)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults(
"SELECT c1, count(c0 ORDER BY c2) FROM tmp GROUP BY c1");
// Verify that spilling is not triggered.
const auto& queryConfig = task->queryCtx()->queryConfig();
ASSERT_TRUE(queryConfig.spillEnabled());
ASSERT_TRUE(queryConfig.aggregationSpillEnabled());
ASSERT_EQ(100, queryConfig.testingSpillPct());
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(AggregationTest, distinctSpillWithMemoryLimit) {
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
VectorFuzzer fuzzer({}, pool());
Expand Down

0 comments on commit aec77a5

Please sign in to comment.