From e8572ae07c24f0fd69b0d657caf4f8eac4194d5a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 13 Nov 2023 16:36:43 +0800 Subject: [PATCH] fixup fixup fixup fixup fixup fixup Revert "fixup" This reverts commit f9efc1a2e3e7f76cff8000a7ad7530aaf47eb361. fixup fixup fixup fixup test This reverts commit 73bf319b76d74a794d2fcffa3b992f581d69f6a1. --- velox/core/PlanNode.cpp | 3 +- velox/core/tests/PlanFragmentTest.cpp | 8 +- velox/exec/GroupingSet.cpp | 8 +- velox/exec/tests/AggregationTest.cpp | 126 ++++++++++++++++++ .../aggregates/ApproxPercentileAggregate.cpp | 13 +- velox/vector/BaseVector.h | 5 + 6 files changed, 153 insertions(+), 10 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 6cf42dbb95e07..65394108d6786 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,7 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index daf88bb8a4831..4c17e3f44db74 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) { {AggregationNode::Step::kSingle, true, true, false, false, true}, {AggregationNode::Step::kIntermediate, false, true, false, false, false}, {AggregationNode::Step::kIntermediate, true, false, false, false, false}, - {AggregationNode::Step::kIntermediate, true, true, true, false, false}, + {AggregationNode::Step::kIntermediate, true, true, true, false, true}, {AggregationNode::Step::kIntermediate, true, true, false, true, false}, - {AggregationNode::Step::kIntermediate, true, true, false, false, false}, + {AggregationNode::Step::kIntermediate, true, true, false, false, true}, {AggregationNode::Step::kPartial, false, true, false, false, false}, {AggregationNode::Step::kPartial, true, false, false, false, false}, - {AggregationNode::Step::kPartial, true, true, true, false, false}, + {AggregationNode::Step::kPartial, true, true, true, false, true}, {AggregationNode::Step::kPartial, true, true, false, true, false}, - {AggregationNode::Step::kPartial, true, true, false, false, false}, + {AggregationNode::Step::kPartial, true, true, false, false, true}, {AggregationNode::Step::kFinal, false, true, false, false, false}, {AggregationNode::Step::kFinal, true, false, false, false, false}, {AggregationNode::Step::kFinal, true, true, true, false, true}, diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index fc55c8429a163..b2cf20ba76c9f 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -830,7 +830,7 @@ const HashLookup& GroupingSet::hashLookup() const { void GroupingSet::ensureInputFits(const RowVectorPtr& input) { // Spilling is considered if this is a final or single aggregation and // spillPath is set. - if (isPartial_ || spillConfig_ == nullptr) { + if (spillConfig_ == nullptr) { return; } @@ -913,7 +913,7 @@ void GroupingSet::ensureOutputFits() { // to reserve memory for the output as we can't reclaim much memory from this // operator itself. The output processing can reclaim memory from the other // operator or query through memory arbitration. - if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) { + if (spillConfig_ == nullptr || hasSpilled()) { return; } @@ -960,6 +960,10 @@ void GroupingSet::spill() { return; } + if (hasSpilled() && spiller_->finalized()) { + return; + } + if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index d91c5c6dea778..441326e610b7b 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -20,6 +20,7 @@ #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Aggregate.h" @@ -397,6 +398,33 @@ class AggregationTest : public OperatorTestBase { VARCHAR()})}; folly::Random::DefaultGenerator rng_; memory::MemoryReclaimer::Stats reclaimerStats_; + + std::shared_ptr newQueryCtx( + int64_t memoryCapacity = memory::kMaxMemory) { + std::unordered_map> configs; + std::shared_ptr pool = memoryManager_->addRootPool( + "", memoryCapacity, MemoryReclaimer::create()); + auto queryCtx = std::make_shared( + executor_.get(), + core::QueryConfig({}), + configs, + cache::AsyncDataCache::getInstance(), + std::move(pool)); + return queryCtx; + } + + void setupMemory() { + memory::MemoryManagerOptions options; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance(); + options.allocator = memoryAllocator_.get(); + memoryManager_ = std::make_unique(options); + } + + private: + std::shared_ptr memoryAllocator_; + std::unique_ptr memoryManager_; }; template <> @@ -847,6 +875,104 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) { .customStats.count("flushRowCount")); } +// TODO move to arbitrator test +TEST_F(AggregationTest, partialAggregationSpill) { + VectorFuzzer::Options fuzzerOpts; + fuzzerOpts.vectorSize = 128; + RowTypePtr rowType = ROW( + {{"c0", INTEGER()}, + {"c1", INTEGER()}, + {"c2", INTEGER()}, + {"c3", INTEGER()}, + {"c4", INTEGER()}, + {"c5", INTEGER()}, + {"c6", INTEGER()}, + {"c7", INTEGER()}, + {"c8", INTEGER()}, + {"c9", INTEGER()}, + {"c10", INTEGER()}}); + VectorFuzzer fuzzer(std::move(fuzzerOpts), pool()); + + std::vector vectors; + + const int32_t numVectors = 2000; + for (int i = 0; i < numVectors; i++) { + vectors.push_back(fuzzer.fuzzRow(rowType)); + } + + createDuckDbTable(vectors); + + setupMemory(); + + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + // Set an artificially low limit on the amount of data to accumulate in + // the partial aggregation. + + // Distinct aggregation. + auto spillDirectory1 = exec::test::TempDirectoryPath::create(); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .spillDirectory(spillDirectory1->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(0)) // always spill on final agg + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); + + // Count aggregation. + auto spillDirectory2 = exec::test::TempDirectoryPath::create(); + task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .spillDirectory(spillDirectory2->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(0)) // always spill on final agg + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1"); + + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); + + // Global aggregation. + task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(c0)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT sum(c0) FROM tmp"); + EXPECT_EQ( + 0, + toPlanStats(task->taskStats()) + .at(partialAggNodeId) + .customStats.count("flushRowCount")); + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false); +} + TEST_F(AggregationTest, partialDistinctWithAbandon) { auto vectors = { // 1st batch will produce 100 distinct groups from 10 rows. diff --git a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp index 1e5a9e35b5404..3994392cb4813 100644 --- a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp @@ -625,8 +625,17 @@ class ApproxPercentileAggregate : public exec::Aggregate { auto rowVec = decoded.base()->as(); if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(rowVec); - for (int i = kPercentiles; i <= kAccuracy; ++i) { - VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding()); + for (int i = kPercentiles; i <= kPercentiles; ++i) { + VELOX_USER_CHECK( + rowVec->childAt(i)->isConstantEncoding() || + rowVec->childAt(i) + ->isArrayEncoding()); // spilling flats constant encoding + } + for (int i = kPercentilesIsArray; i <= kAccuracy; ++i) { + VELOX_USER_CHECK( + rowVec->childAt(i)->isConstantEncoding() || + rowVec->childAt(i) + ->isFlatEncoding()); // spilling flats constant encoding } for (int i = kK; i <= kMaxValue; ++i) { VELOX_USER_CHECK(rowVec->childAt(i)->isFlatEncoding()); diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index a7235b8f3d254..9c96a5103e672 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -340,6 +340,11 @@ class BaseVector { */ virtual std::unique_ptr> hashAll() const = 0; + /// Returns true if this vector is encoded as array + bool isArrayEncoding() const { + return encoding_ == VectorEncoding::Simple::ARRAY; + } + /// Returns true if this vector is encoded as flat (FlatVector). bool isFlatEncoding() const { return encoding_ == VectorEncoding::Simple::FLAT;