diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index bb3be12f2983..87f91d8c7182 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,7 +237,16 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); + if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) { + return preGroupedKeys().empty(); + } + + if ((isIntermediate() || isPartial()) && + queryConfig.partialAggregationSpillEnabled()) { + return preGroupedKeys().empty(); + } + + return false; } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index a041cdc9aee6..99073e4380e9 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -607,6 +607,14 @@ class AggregationNode : public PlanNode { return step_ == Step::kSingle; } + bool isIntermediate() const { + return step_ == Step::kIntermediate; + } + + bool isPartial() const { + return step_ == Step::kPartial; + } + folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 340fdd5412a3..2fc6a903d06e 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -198,6 +198,11 @@ class QueryConfig { static constexpr const char* kAggregationSpillEnabled = "aggregation_spill_enabled"; + /// Partial aggregation spilling flag, only applies if "spill_enabled" flag is + /// set. + static constexpr const char* kPartialAggregationSpillEnabled = + "partial_aggregation_spill_enabled"; + /// Join spilling flag, only applies if "spill_enabled" flag is set. static constexpr const char* kJoinSpillEnabled = "join_spill_enabled"; @@ -493,11 +498,17 @@ class QueryConfig { } /// Returns 'is aggregation spilling enabled' flag. Must also check the - /// spillEnabled()!g + /// spillEnabled()! bool aggregationSpillEnabled() const { return get(kAggregationSpillEnabled, true); } + /// Returns 'is partial aggregation spilling enabled' flag. Must also check + /// the spillEnabled()! + bool partialAggregationSpillEnabled() const { + return get(kPartialAggregationSpillEnabled, false); + } + /// Returns 'is join spilling enabled' flag. Must also check the /// spillEnabled()! bool joinSpillEnabled() const { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index 4c17e3f44db7..daf88bb8a483 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) { {AggregationNode::Step::kSingle, true, true, false, false, true}, {AggregationNode::Step::kIntermediate, false, true, false, false, false}, {AggregationNode::Step::kIntermediate, true, false, false, false, false}, - {AggregationNode::Step::kIntermediate, true, true, true, false, true}, + {AggregationNode::Step::kIntermediate, true, true, true, false, false}, {AggregationNode::Step::kIntermediate, true, true, false, true, false}, - {AggregationNode::Step::kIntermediate, true, true, false, false, true}, + {AggregationNode::Step::kIntermediate, true, true, false, false, false}, {AggregationNode::Step::kPartial, false, true, false, false, false}, {AggregationNode::Step::kPartial, true, false, false, false, false}, - {AggregationNode::Step::kPartial, true, true, true, false, true}, + {AggregationNode::Step::kPartial, true, true, true, false, false}, {AggregationNode::Step::kPartial, true, true, false, true, false}, - {AggregationNode::Step::kPartial, true, true, false, false, true}, + {AggregationNode::Step::kPartial, true, true, false, false, false}, {AggregationNode::Step::kFinal, false, true, false, false, false}, {AggregationNode::Step::kFinal, true, false, false, false, false}, {AggregationNode::Step::kFinal, true, true, true, false, true}, diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index d279c216bfd6..636e8a9170a3 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -725,6 +725,7 @@ bool GroupingSet::getOutput( } if (hasSpilled()) { + spill(); return getOutputWithSpill(maxOutputRows, maxOutputBytes, result); } VELOX_CHECK(!isDistinct()); @@ -929,6 +930,9 @@ void GroupingSet::ensureOutputFits() { return; } } + if (hasSpilled()) { + return; + } spill(RowContainerIterator{}); } @@ -955,11 +959,6 @@ void GroupingSet::spill() { if (table_ == nullptr || table_->numDistinct() == 0) { return; } - - if (hasSpilled() && spiller_->finalized()) { - return; - } - if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); @@ -1049,7 +1048,16 @@ bool GroupingSet::getOutputWithSpill( if (merge_ == nullptr) { return false; } - return mergeNext(maxOutputRows, maxOutputBytes, result); + bool hasData = mergeNext(maxOutputRows, maxOutputBytes, result); + if (!hasData) { + // If spill has been finalized, reset merge stream and spiller. This would + // help partial aggregation replay the spilling procedure once needed again. + merge_ = nullptr; + mergeRows_ = nullptr; + mergeArgs_.clear(); + spiller_ = nullptr; + } + return hasData; } bool GroupingSet::mergeNext( diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 441326e610b7..d91c5c6dea77 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -20,7 +20,6 @@ #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" -#include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Aggregate.h" @@ -398,33 +397,6 @@ class AggregationTest : public OperatorTestBase { VARCHAR()})}; folly::Random::DefaultGenerator rng_; memory::MemoryReclaimer::Stats reclaimerStats_; - - std::shared_ptr newQueryCtx( - int64_t memoryCapacity = memory::kMaxMemory) { - std::unordered_map> configs; - std::shared_ptr pool = memoryManager_->addRootPool( - "", memoryCapacity, MemoryReclaimer::create()); - auto queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - configs, - cache::AsyncDataCache::getInstance(), - std::move(pool)); - return queryCtx; - } - - void setupMemory() { - memory::MemoryManagerOptions options; - options.arbitratorKind = "SHARED"; - options.checkUsageLeak = true; - memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance(); - options.allocator = memoryAllocator_.get(); - memoryManager_ = std::make_unique(options); - } - - private: - std::shared_ptr memoryAllocator_; - std::unique_ptr memoryManager_; }; template <> @@ -875,104 +847,6 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) { .customStats.count("flushRowCount")); } -// TODO move to arbitrator test -TEST_F(AggregationTest, partialAggregationSpill) { - VectorFuzzer::Options fuzzerOpts; - fuzzerOpts.vectorSize = 128; - RowTypePtr rowType = ROW( - {{"c0", INTEGER()}, - {"c1", INTEGER()}, - {"c2", INTEGER()}, - {"c3", INTEGER()}, - {"c4", INTEGER()}, - {"c5", INTEGER()}, - {"c6", INTEGER()}, - {"c7", INTEGER()}, - {"c8", INTEGER()}, - {"c9", INTEGER()}, - {"c10", INTEGER()}}); - VectorFuzzer fuzzer(std::move(fuzzerOpts), pool()); - - std::vector vectors; - - const int32_t numVectors = 2000; - for (int i = 0; i < numVectors; i++) { - vectors.push_back(fuzzer.fuzzRow(rowType)); - } - - createDuckDbTable(vectors); - - setupMemory(); - - core::PlanNodeId partialAggNodeId; - core::PlanNodeId finalAggNodeId; - // Set an artificially low limit on the amount of data to accumulate in - // the partial aggregation. - - // Distinct aggregation. - auto spillDirectory1 = exec::test::TempDirectoryPath::create(); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .spillDirectory(spillDirectory1->path) - .config(QueryConfig::kSpillEnabled, "true") - .config(QueryConfig::kAggregationSpillEnabled, "true") - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(0)) // always spill on final agg - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({"c0"}, {}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); - - // Count aggregation. - auto spillDirectory2 = exec::test::TempDirectoryPath::create(); - task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .spillDirectory(spillDirectory2->path) - .config(QueryConfig::kSpillEnabled, "true") - .config(QueryConfig::kAggregationSpillEnabled, "true") - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(0)) // always spill on final agg - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1"); - - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); - - // Global aggregation. - task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({}, {"sum(c0)"}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT sum(c0) FROM tmp"); - EXPECT_EQ( - 0, - toPlanStats(task->taskStats()) - .at(partialAggNodeId) - .customStats.count("flushRowCount")); - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false); -} - TEST_F(AggregationTest, partialDistinctWithAbandon) { auto vectors = { // 1st batch will produce 100 distinct groups from 10 rows. diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 6577e7d0e70b..3d1e9350c395 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -1053,6 +1053,50 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) { waitForAllTasksToBeDeleted(); } +TEST_F(SharedArbitrationTest, reclaimFromPartialAggregation) { + const uint64_t maxQueryCapacity = 20L << 20; + std::vector vectors = newVectors(1024, maxQueryCapacity * 2); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + std::shared_ptr queryCtx = newQueryCtx(maxQueryCapacity); + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config( + core::QueryConfig::kMaxPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kMaxExtendedPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + core::QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& partialStats = taskStats.at(partialAggNodeId); + auto& finalStats = taskStats.at(finalAggNodeId); + ASSERT_GT(partialStats.spilledBytes, 0); + ASSERT_GT(finalStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromAggregationOnNoMoreInput) { const int numVectors = 32; std::vector vectors;