diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index cfc61edc0280..02be45e15748 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -994,7 +994,8 @@ void GroupingSet::spill() { ++partition) { numDistinctSpillFilesPerPartition_[partition] = spiller_->state().numFinishedFiles(partition); - totalNumDistinctSpilledFiles += numDistinctSpillFilesPerPartition_.back(); + totalNumDistinctSpilledFiles += + numDistinctSpillFilesPerPartition_[partition]; } VELOX_CHECK_GT(totalNumDistinctSpilledFiles, 0); } diff --git a/velox/exec/PlanNodeStats.h b/velox/exec/PlanNodeStats.h index 4d10b9d60875..71e653aebafd 100644 --- a/velox/exec/PlanNodeStats.h +++ b/velox/exec/PlanNodeStats.h @@ -109,7 +109,7 @@ struct PlanNodeStats { /// Number of total splits. int numSplits{0}; - // Total bytes in memory for spilling + /// Total bytes in memory for spilling uint64_t spilledInputBytes{0}; /// Total bytes written for spilling. diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 364ea2d51045..4b4253fc2107 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1811,26 +1811,45 @@ DEBUG_ONLY_TEST_F(AggregationTest, minSpillableMemoryReservation) { } TEST_F(AggregationTest, distinctWithSpilling) { - auto vectors = makeVectors(rowType_, 10, 100); - createDuckDbTable(vectors); - auto spillDirectory = exec::test::TempDirectoryPath::create(); - core::PlanNodeId aggrNodeId; - TestScopedSpillInjection scopedSpillInjection(100); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->getPath()) - .config(QueryConfig::kSpillEnabled, true) - .config(QueryConfig::kAggregationSpillEnabled, true) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0"}, {}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - // Verify that spilling is not triggered. - ASSERT_GT(toPlanStats(task->taskStats()).at(aggrNodeId).spilledInputBytes, 0); - ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledPartitions, 8); - ASSERT_GT(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0); - OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + struct TestParam { + std::vector inputs; + std::function expectedSpillFilesCheck{nullptr}; + }; + + std::vector testParams{ + {makeVectors(rowType_, 10, 100), + [](uint32_t spilledFiles) { ASSERT_GE(spilledFiles, 100); }}, + {{makeRowVector( + {"c0"}, + {makeFlatVector( + 2'000, [](vector_size_t /* unused */) { return 100; })})}, + [](uint32_t spilledFiles) { ASSERT_EQ(spilledFiles, 1); }}}; + + for (const auto& testParam : testParams) { + createDuckDbTable(testParam.inputs); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .plan(PlanBuilder() + .values(testParam.inputs) + .singleAggregation({"c0"}, {}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + + // Verify that spilling is not triggered. + const auto planNodeStatsMap = toPlanStats(task->taskStats()); + const auto& aggrNodeStats = planNodeStatsMap.at(aggrNodeId); + ASSERT_GT(aggrNodeStats.spilledInputBytes, 0); + ASSERT_EQ(aggrNodeStats.spilledPartitions, 8); + ASSERT_GT(aggrNodeStats.spilledBytes, 0); + testParam.expectedSpillFilesCheck(aggrNodeStats.spilledFiles); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + } } TEST_F(AggregationTest, spillingForAggrsWithDistinct) { @@ -3706,5 +3725,4 @@ TEST_F(AggregationTest, nanKeys) { {makeRowVector({c0, c1}), c1}, {makeRowVector({e0, e1}), e1}); } - } // namespace facebook::velox::exec::test