Skip to content

Commit

Permalink
Fix bug in grouping set spill (facebookincubator#10548)
Browse files Browse the repository at this point in the history
Summary:
At fix site, The use of .back() on an already resized std::vector caused the totalNumDistinctSpilledFiles potentially be 0 as long as the last partition spilled file count is 0.

Pull Request resolved: facebookincubator#10548

Reviewed By: Yuhta, zacw7

Differential Revision: D60181140

Pulled By: tanjialiang

fbshipit-source-id: 254a95a0c67c71336dca2180483a3b86253b497c
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Jul 25, 2024
1 parent 60205c7 commit 0fe4db8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
3 changes: 2 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,8 @@ void GroupingSet::spill() {
++partition) {
numDistinctSpillFilesPerPartition_[partition] =
spiller_->state().numFinishedFiles(partition);
totalNumDistinctSpilledFiles += numDistinctSpillFilesPerPartition_.back();
totalNumDistinctSpilledFiles +=
numDistinctSpillFilesPerPartition_[partition];
}
VELOX_CHECK_GT(totalNumDistinctSpilledFiles, 0);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct PlanNodeStats {
/// Number of total splits.
int numSplits{0};

// Total bytes in memory for spilling
/// Total bytes in memory for spilling
uint64_t spilledInputBytes{0};

/// Total bytes written for spilling.
Expand Down
60 changes: 39 additions & 21 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1811,26 +1811,45 @@ DEBUG_ONLY_TEST_F(AggregationTest, minSpillableMemoryReservation) {
}

TEST_F(AggregationTest, distinctWithSpilling) {
auto vectors = makeVectors(rowType_, 10, 100);
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0"}, {}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults("SELECT distinct c0 FROM tmp");
// Verify that spilling is not triggered.
ASSERT_GT(toPlanStats(task->taskStats()).at(aggrNodeId).spilledInputBytes, 0);
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledPartitions, 8);
ASSERT_GT(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
struct TestParam {
std::vector<RowVectorPtr> inputs;
std::function<void(uint32_t)> expectedSpillFilesCheck{nullptr};
};

std::vector<TestParam> testParams{
{makeVectors(rowType_, 10, 100),
[](uint32_t spilledFiles) { ASSERT_GE(spilledFiles, 100); }},
{{makeRowVector(
{"c0"},
{makeFlatVector<int64_t>(
2'000, [](vector_size_t /* unused */) { return 100; })})},
[](uint32_t spilledFiles) { ASSERT_EQ(spilledFiles, 1); }}};

for (const auto& testParam : testParams) {
createDuckDbTable(testParam.inputs);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.plan(PlanBuilder()
.values(testParam.inputs)
.singleAggregation({"c0"}, {}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults("SELECT distinct c0 FROM tmp");

// Verify that spilling is not triggered.
const auto planNodeStatsMap = toPlanStats(task->taskStats());
const auto& aggrNodeStats = planNodeStatsMap.at(aggrNodeId);
ASSERT_GT(aggrNodeStats.spilledInputBytes, 0);
ASSERT_EQ(aggrNodeStats.spilledPartitions, 8);
ASSERT_GT(aggrNodeStats.spilledBytes, 0);
testParam.expectedSpillFilesCheck(aggrNodeStats.spilledFiles);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

TEST_F(AggregationTest, spillingForAggrsWithDistinct) {
Expand Down Expand Up @@ -3706,5 +3725,4 @@ TEST_F(AggregationTest, nanKeys) {
{makeRowVector({c0, c1}), c1},
{makeRowVector({e0, e1}), e1});
}

} // namespace facebook::velox::exec::test

0 comments on commit 0fe4db8

Please sign in to comment.