diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 1ce97342b820..d19afb8402f1 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -233,8 +233,20 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + if (!preGroupedKeys().empty()) { + return false; + } + + if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) { + return true; + } + + if ((isIntermediate() || isPartial()) && + queryConfig.partialAggregationSpillEnabled()) { + return true; + } + + return false; } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index deb3e0adba68..aa5840a92249 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -620,6 +620,14 @@ class AggregationNode : public PlanNode { return step_ == Step::kSingle; } + bool isIntermediate() const { + return step_ == Step::kIntermediate; + } + + bool isPartial() const { + return step_ == Step::kPartial; + } + folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 23440ffcaef7..17d7db8774eb 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -201,6 +201,14 @@ class QueryConfig { static constexpr const char* kAggregationSpillEnabled = "aggregation_spill_enabled"; + /// Partial aggregation spilling flag, only applies if "spill_enabled" flag is + /// set. + /// If true, partial aggregation flushing will be disabled. Which means, + /// settings of kMaxPartialAggregationMemory and + /// kMaxExtendedPartialAggregationMemory will be ignored. + static constexpr const char* kPartialAggregationSpillEnabled = + "partial_aggregation_spill_enabled"; + /// Join spilling flag, only applies if "spill_enabled" flag is set. static constexpr const char* kJoinSpillEnabled = "join_spill_enabled"; @@ -525,11 +533,17 @@ class QueryConfig { } /// Returns 'is aggregation spilling enabled' flag. Must also check the - /// spillEnabled()!g + /// spillEnabled()! bool aggregationSpillEnabled() const { return get(kAggregationSpillEnabled, true); } + /// Returns 'is partial aggregation spilling enabled' flag. Must also check + /// the spillEnabled()! + bool partialAggregationSpillEnabled() const { + return get(kPartialAggregationSpillEnabled, false); + } + /// Returns 'is join spilling enabled' flag. Must also check the /// spillEnabled()! bool joinSpillEnabled() const { diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 275f894b1b5d..7eef6b941287 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -215,6 +215,13 @@ Spilling - boolean - true - When `spill_enabled` is true, determines whether HashAggregation operator can spill to disk under memory pressure. + * - partial_aggregation_spill_enabled + - boolean + - false + - When `spill_enabled` is true, determines whether the partial phase of HashAggregation operator can spill to disk under memory pressure. + When true, flushing will be disabled so settings of max_partial_aggregation_memory and max_extended_partial_aggregation_memory will be ignored. + Comparing to flushing, enabling spilling would make Velox reduce data size from partial aggregation phase as much as possible however would slow + down partial aggregation's own processing. * - join_spill_enabled - boolean - true diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index c524efcca45d..8838e1c28efc 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -803,7 +803,7 @@ const HashLookup& GroupingSet::hashLookup() const { void GroupingSet::ensureInputFits(const RowVectorPtr& input) { // Spilling is considered if this is a final or single aggregation and // spillPath is set. - if (isPartial_ || spillConfig_ == nullptr) { + if (spillConfig_ == nullptr) { return; } @@ -888,7 +888,7 @@ void GroupingSet::ensureOutputFits() { // to reserve memory for the output as we can't reclaim much memory from this // operator itself. The output processing can reclaim memory from the other // operator or query through memory arbitration. - if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) { + if (spillConfig_ == nullptr || hasSpilled()) { return; } @@ -938,7 +938,6 @@ void GroupingSet::spill() { if (table_ == nullptr || table_->numDistinct() == 0) { return; } - if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); @@ -1175,6 +1174,8 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) { } void GroupingSet::abandonPartialAggregation() { + VELOX_CHECK(!hasSpilled()) + abandonedPartialAggregation_ = true; allSupportToIntermediate_ = true; for (auto& aggregate : aggregates_) { diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index a01253010a0c..a7a6bb737c00 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -113,6 +113,12 @@ void HashAggregation::initialize() { bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { VELOX_CHECK(isPartialOutput_ && !isGlobal_); + if (groupingSet_->hasSpilled()) { + // Once spilling kicked in, disable the abandoning code path. + // This is because spilled rows did not count to + // numOutput yet based on current way of calculation. + return false; + } return numInputRows_ > abandonPartialAggregationMinRows_ && 100 * numOutput / numInputRows_ >= abandonPartialAggregationMinPct_; } @@ -135,9 +141,13 @@ void HashAggregation::addInput(RowVectorPtr input) { // NOTE: we should not trigger partial output flush in case of global // aggregation as the final aggregator will handle it the same way as the // partial aggregator. Hence, we have to use more memory anyway. + // + // We currently disable flushing when spilling is enabled. It's possible + // to make spilling and flushing work together once we had a way to + // count the spilled data size into partial aggregation's memory usage. const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ && abandonPartialAggregationEarly(groupingSet_->numDistinct()); - if (isPartialOutput_ && !isGlobal_ && + if (!spillConfig_.has_value() && isPartialOutput_ && !isGlobal_ && (abandonPartialEarly || groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_))) { partialFull_ = true; diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index f4747c91d6b2..36f750ddd79a 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -166,6 +166,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { testPlan( planWithSplits, false /*injectSpill*/, + false /*injectPartialSpill*/, false /*abandonPartial*/, customVerification, customVerifiers, @@ -176,6 +177,19 @@ class AggregationFuzzer : public AggregationFuzzerBase { testPlan( planWithSplits, true /*injectSpill*/, + false /*injectPartialSpill*/, + false /*abandonPartial*/, + customVerification, + customVerifiers, + expected, + maxDrivers); + + LOG(INFO) << "Testing plan #" << i + << " with partial aggregation spilling"; + testPlan( + planWithSplits, + true /*injectSpill*/, + true /*injectPartialSpill*/, false /*abandonPartial*/, customVerification, customVerifiers, @@ -188,6 +202,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { testPlan( planWithSplits, false /*injectSpill*/, + false /*injectPartialSpill*/, true /*abandonPartial*/, customVerification, customVerifiers, diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index cb2e0d0d3472..2a062ddd07ca 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -359,6 +359,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( const core::PlanNodePtr& plan, const std::vector& splits, bool injectSpill, + bool injectPartialSpill, bool abandonPartial, int32_t maxDrivers) { LOG(INFO) << "Executing query plan: " << std::endl @@ -371,12 +372,18 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( builder.configs(queryConfigs_); - if (injectSpill) { + if (injectSpill || injectPartialSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true") .config(core::QueryConfig::kTestingSpillPct, "100"); + if (injectSpill) { + builder.config(core::QueryConfig::kAggregationSpillEnabled, "true"); + } + if (injectPartialSpill) { + builder.config( + core::QueryConfig::kPartialAggregationSpillEnabled, "true"); + } } if (abandonPartial) { @@ -431,6 +438,7 @@ AggregationFuzzerBase::computeReferenceResults( void AggregationFuzzerBase::testPlan( const PlanWithSplits& planWithSplits, bool injectSpill, + bool injectPartialSpill, bool abandonPartial, bool customVerification, const std::vector>& customVerifiers, @@ -440,6 +448,7 @@ void AggregationFuzzerBase::testPlan( planWithSplits.plan, planWithSplits.splits, injectSpill, + injectPartialSpill, abandonPartial, maxDrivers); diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 3e352fcbed6c..fdb0fb6e7b01 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -190,6 +190,7 @@ class AggregationFuzzerBase { const core::PlanNodePtr& plan, const std::vector& splits = {}, bool injectSpill = false, + bool injectPartialSpill = false, bool abandonPartial = false, int32_t maxDrivers = 2); @@ -201,6 +202,7 @@ class AggregationFuzzerBase { void testPlan( const PlanWithSplits& planWithSplits, bool injectSpill, + bool injectPartialSpill, bool abandonPartial, bool customVerification, const std::vector>& customVerifiers, diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index cda061f3db9c..10d6ce3196f4 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1169,6 +1169,128 @@ TEST_F(AggregationTest, spillAll) { OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } +TEST_F(AggregationTest, partialSpillWithMemoryLimit) { + constexpr int32_t kNumDistinct = 2000; + constexpr int64_t kMaxBytes = 1LL << 30; // 1GB + rng_.seed(1); + rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); + auto batches = makeVectors(rowType_, 100, 5); + + core::PlanNodeId partialAggrNodeId; + const auto plan = PlanBuilder() + .values(batches) + .partialAggregation({"c0"}, {"count(1)", "sum(c1)"}) + .capturePlanNodeId(partialAggrNodeId) + .finalAggregation() + .planNode(); + const auto expectedResults = + AssertQueryBuilder(plan).copyResults(pool_.get()); + + struct { + uint64_t aggregationMemLimit; + bool expectSpill; + + std::string debugString() const { + return fmt::format( + "aggregationMemLimit:{}, expectSpill:{}", + aggregationMemLimit, + expectSpill); + } + } testSettings[] = {// Memory limit is disabled so spilling is not triggered. + {0, false}, + // Memory limit is too small so always trigger spilling. + {1, true}, + // Memory limit is too large so spilling is not triggered. + {1'000'000'000, false}}; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(testData.aggregationMemLimit)) + .config( + QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .assertResults(expectedResults); + + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(partialAggrNodeId); + checkSpillStats(stats, testData.expectSpill); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + } +} + +TEST_F(AggregationTest, partialDistinctSpillWithMemoryLimit) { + constexpr int32_t kNumDistinct = 2000; + constexpr int64_t kMaxBytes = 1LL << 30; // 1GB + rng_.seed(1); + rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); + auto batches = makeVectors(rowType_, 100, 5); + + core::PlanNodeId partialAggrNodeId; + const auto plan = PlanBuilder() + .values(batches) + .partialAggregation({"c0"}, {}, {}) + .capturePlanNodeId(partialAggrNodeId) + .finalAggregation({"c0"}, {}, {}) + .planNode(); + const auto expectedResults = + AssertQueryBuilder(plan).copyResults(pool_.get()); + + struct { + uint64_t aggregationMemLimit; + bool expectSpill; + + std::string debugString() const { + return fmt::format( + "aggregationMemLimit:{}, expectSpill:{}", + aggregationMemLimit, + expectSpill); + } + } testSettings[] = {// Memory limit is disabled so spilling is not triggered. + {0, false}, + // Memory limit is too small so always trigger spilling. + {1, true}, + // Memory limit is too large so spilling is not triggered. + {1'000'000'000, false}}; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(testData.aggregationMemLimit)) + .config( + QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .assertResults(expectedResults); + + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(partialAggrNodeId); + checkSpillStats(stats, testData.expectSpill); + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + } +} + // Verify number of memory allocations in the HashAggregation operator. TEST_F(AggregationTest, memoryAllocations) { vector_size_t size = 1'024; diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 8d8ac92e2a67..67b37584a1e7 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -1171,7 +1171,7 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) { createDuckDbTable(vectors); const auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; - std::shared_ptr queryCtx = newQueryCtx(maxQueryCapacity); + auto queryCtx = newQueryCtx(maxQueryCapacity); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") @@ -1190,6 +1190,99 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) { waitForAllTasksToBeDeleted(); } +TEST_F(SharedArbitrationTest, reclaimFromPartialAggregation) { + const uint64_t maxQueryCapacity = 20L << 20; + std::vector vectors = newVectors(1024, maxQueryCapacity * 2); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + auto queryCtx = newQueryCtx(maxQueryCapacity); + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config( + core::QueryConfig::kMaxPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kMaxExtendedPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + core::QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& partialStats = taskStats.at(partialAggNodeId); + auto& finalStats = taskStats.at(finalAggNodeId); + ASSERT_GT(partialStats.spilledBytes, 0); + ASSERT_GT(finalStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); +} + +TEST_F( + SharedArbitrationTest, + reclaimFromPartialAggregationAndIgnoreFlushingSettings) { + const uint64_t maxQueryCapacity = 20L << 20; + std::vector vectors = newVectors(1024, maxQueryCapacity * 2); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + auto queryCtx = newQueryCtx(maxQueryCapacity); + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config( + core::QueryConfig::kMaxPartialAggregationMemory, + std::to_string(1L)) + .config( + core::QueryConfig::kMaxExtendedPartialAggregationMemory, + std::to_string(1L)) + .config( + core::QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + core::QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& partialStats = taskStats.at(partialAggNodeId); + auto& finalStats = taskStats.at(finalAggNodeId); + ASSERT_EQ( + partialStats.customStats.find("flushRowCount"), + partialStats.customStats.end()); + ASSERT_GT(partialStats.spilledBytes, 0); + ASSERT_GT(finalStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromAggregationOnNoMoreInput) { const int numVectors = 32; std::vector vectors; diff --git a/velox/exec/tests/utils/TempDirectoryPath.cpp b/velox/exec/tests/utils/TempDirectoryPath.cpp index 146cbff328a7..b34815a0cd5a 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.cpp +++ b/velox/exec/tests/utils/TempDirectoryPath.cpp @@ -28,7 +28,7 @@ std::shared_ptr TempDirectoryPath::create() { } TempDirectoryPath::~TempDirectoryPath() { - LOG(INFO) << "TempDirectoryPath:: removing all files from" << path; + LOG(INFO) << "TempDirectoryPath:: removing all files from " << path; try { boost::filesystem::remove_all(path.c_str()); } catch (...) {