From 3fd188e2b446fe37b33f42e92c5fcfdd100e922f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 19 Dec 2023 11:08:07 +0800 Subject: [PATCH] fixup --- velox/exec/tests/AggregationTest.cpp | 131 ++++++++++++++++++++------- 1 file changed, 96 insertions(+), 35 deletions(-) diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 48c52a9101eb..10d6ce3196f4 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1129,6 +1129,46 @@ TEST_F(AggregationTest, spillWithMemoryLimit) { } } +TEST_F(AggregationTest, spillAll) { + auto inputs = makeVectors(rowType_, 100, 10); + + const auto numDistincts = + AssertQueryBuilder(PlanBuilder() + .values(inputs) + .singleAggregation({"c0"}, {}, {}) + .planNode()) + .copyResults(pool_.get()) + ->size(); + + auto plan = PlanBuilder() + .values(inputs) + .singleAggregation({"c0"}, {"array_agg(c1)"}) + .planNode(); + + auto results = AssertQueryBuilder(plan).copyResults(pool_.get()); + + auto tempDirectory = exec::test::TempDirectoryPath::create(); + auto queryCtx = std::make_shared(executor_.get()); + auto task = AssertQueryBuilder(plan) + .spillDirectory(tempDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + // Set one spill partition to avoid the test flakiness. + // Set the memory trigger limit to be a very small value. + .config(QueryConfig::kAggregationSpillMemoryThreshold, "1024") + .assertResults(results); + + auto stats = task->taskStats().pipelineStats; + ASSERT_LT(0, stats[0].operatorStats[1].runtimeStats["spillRuns"].count); + // Check spilled bytes. + ASSERT_LT(0, stats[0].operatorStats[1].spilledInputBytes); + ASSERT_LT(0, stats[0].operatorStats[1].spilledBytes); + ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 1); + // Verifies all the rows have been spilled. + ASSERT_EQ(stats[0].operatorStats[1].spilledRows, numDistincts); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); +} + TEST_F(AggregationTest, partialSpillWithMemoryLimit) { constexpr int32_t kNumDistinct = 2000; constexpr int64_t kMaxBytes = 1LL << 30; // 1GB @@ -1139,9 +1179,9 @@ TEST_F(AggregationTest, partialSpillWithMemoryLimit) { core::PlanNodeId partialAggrNodeId; const auto plan = PlanBuilder() .values(batches) - .partialAggregation({"c0"}, {}, {}) + .partialAggregation({"c0"}, {"count(1)", "sum(c1)"}) .capturePlanNodeId(partialAggrNodeId) - .finalAggregation({"c0"}, {}, {}) + .finalAggregation() .planNode(); const auto expectedResults = AssertQueryBuilder(plan).copyResults(pool_.get()); @@ -1190,44 +1230,65 @@ TEST_F(AggregationTest, partialSpillWithMemoryLimit) { } } -TEST_F(AggregationTest, spillAll) { - auto inputs = makeVectors(rowType_, 100, 10); +TEST_F(AggregationTest, partialDistinctSpillWithMemoryLimit) { + constexpr int32_t kNumDistinct = 2000; + constexpr int64_t kMaxBytes = 1LL << 30; // 1GB + rng_.seed(1); + rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); + auto batches = makeVectors(rowType_, 100, 5); - const auto numDistincts = - AssertQueryBuilder(PlanBuilder() - .values(inputs) - .singleAggregation({"c0"}, {}, {}) - .planNode()) - .copyResults(pool_.get()) - ->size(); + core::PlanNodeId partialAggrNodeId; + const auto plan = PlanBuilder() + .values(batches) + .partialAggregation({"c0"}, {}, {}) + .capturePlanNodeId(partialAggrNodeId) + .finalAggregation({"c0"}, {}, {}) + .planNode(); + const auto expectedResults = + AssertQueryBuilder(plan).copyResults(pool_.get()); - auto plan = PlanBuilder() - .values(inputs) - .singleAggregation({"c0"}, {"array_agg(c1)"}) - .planNode(); + struct { + uint64_t aggregationMemLimit; + bool expectSpill; - auto results = AssertQueryBuilder(plan).copyResults(pool_.get()); + std::string debugString() const { + return fmt::format( + "aggregationMemLimit:{}, expectSpill:{}", + aggregationMemLimit, + expectSpill); + } + } testSettings[] = {// Memory limit is disabled so spilling is not triggered. + {0, false}, + // Memory limit is too small so always trigger spilling. + {1, true}, + // Memory limit is too large so spilling is not triggered. + {1'000'000'000, false}}; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); - auto tempDirectory = exec::test::TempDirectoryPath::create(); - auto queryCtx = std::make_shared(executor_.get()); - auto task = AssertQueryBuilder(plan) - .spillDirectory(tempDirectory->path) - .config(QueryConfig::kSpillEnabled, "true") - .config(QueryConfig::kAggregationSpillEnabled, "true") - // Set one spill partition to avoid the test flakiness. - // Set the memory trigger limit to be a very small value. - .config(QueryConfig::kAggregationSpillMemoryThreshold, "1024") - .assertResults(results); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(testData.aggregationMemLimit)) + .config( + QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .assertResults(expectedResults); - auto stats = task->taskStats().pipelineStats; - ASSERT_LT(0, stats[0].operatorStats[1].runtimeStats["spillRuns"].count); - // Check spilled bytes. - ASSERT_LT(0, stats[0].operatorStats[1].spilledInputBytes); - ASSERT_LT(0, stats[0].operatorStats[1].spilledBytes); - ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 1); - // Verifies all the rows have been spilled. - ASSERT_EQ(stats[0].operatorStats[1].spilledRows, numDistincts); - OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(partialAggrNodeId); + checkSpillStats(stats, testData.expectSpill); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + } } // Verify number of memory allocations in the HashAggregation operator.