From aec77a5f02159a0000dfca87d26f20caaa64c6fd Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Tue, 7 Nov 2023 17:44:41 -0800 Subject: [PATCH] Disable spilling for aggregations over distinct or sorted inputs (#7460) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/7460 Disabling spilling for aggregations with distinct and sorting, because these aren't supported yet. Reviewed By: xiaoxmeng Differential Revision: D51091567 fbshipit-source-id: 4f2284f8e07ba47dcd302657cc580ca85335fb0e --- velox/core/PlanNode.cpp | 15 ++++++++ velox/core/PlanNode.h | 7 +--- velox/exec/tests/AggregationTest.cpp | 53 ++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 3ea3a473f639..ebfb57281f60 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -227,6 +227,21 @@ void addSortingKeys( } } // namespace +bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { + // TODO: add spilling for aggregations with sorting or with distinct later: + // https://github.com/facebookincubator/velox/issues/7454 + // https://github.com/facebookincubator/velox/issues/7455 + for (const auto& aggregate : aggregates_) { + if (aggregate.distinct || !aggregate.sortingKeys.empty()) { + return false; + } + } + // TODO: add spilling for pre-grouped aggregation later: + // https://github.com/facebookincubator/velox/issues/3264 + return (isFinal() || isSingle()) && preGroupedKeys().empty() && + queryConfig.aggregationSpillEnabled(); +} + void AggregationNode::addDetails(std::stringstream& stream) const { stream << stepName(step_) << " "; diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index b19581faf18b..4e04b4bcae16 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -597,12 +597,7 @@ class AggregationNode : public PlanNode { return "Aggregation"; } - bool canSpill(const QueryConfig& queryConfig) const override { - // TODO: add spilling for pre-grouped aggregation later: - // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); - } + bool canSpill(const QueryConfig& queryConfig) const override; bool isFinal() const { return step_ == Step::kFinal; diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 013988ac8073..272b557582ce 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1785,6 +1785,59 @@ TEST_F(AggregationTest, distinctWithSpilling) { OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } +TEST_F(AggregationTest, spillingForAggrsWithDistinct) { + auto vectors = makeVectors(rowType_, 100, 10); + createDuckDbTable(vectors); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config(QueryConfig::kTestingSpillPct, "100") + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c1"}, {"count(DISTINCT c0)"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults("SELECT c1, count(DISTINCT c0) FROM tmp GROUP BY c1"); + // Verify that spilling is not triggered. + const auto& queryConfig = task->queryCtx()->queryConfig(); + ASSERT_TRUE(queryConfig.spillEnabled()); + ASSERT_TRUE(queryConfig.aggregationSpillEnabled()); + ASSERT_EQ(100, queryConfig.testingSpillPct()); + ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); +} + +TEST_F(AggregationTest, spillingForAggrsWithSorting) { + auto vectors = makeVectors(rowType_, 100, 10); + createDuckDbTable(vectors); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config(QueryConfig::kTestingSpillPct, "100") + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c1"}, {"count(c0 ORDER BY c2)"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults( + "SELECT c1, count(c0 ORDER BY c2) FROM tmp GROUP BY c1"); + // Verify that spilling is not triggered. + const auto& queryConfig = task->queryCtx()->queryConfig(); + ASSERT_TRUE(queryConfig.spillEnabled()); + ASSERT_TRUE(queryConfig.aggregationSpillEnabled()); + ASSERT_EQ(100, queryConfig.testingSpillPct()); + ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); +} + TEST_F(AggregationTest, distinctSpillWithMemoryLimit) { rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); VectorFuzzer fuzzer({}, pool());