Skip to content

Commit

Permalink
Re-pick partial aggregation spilling patch (#453)
Browse files Browse the repository at this point in the history
This is a fix-up patch for 45117f0
  • Loading branch information
zhztheplayer authored Nov 29, 2023
1 parent 4e22cd7 commit 3356607
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 138 deletions.
11 changes: 10 additions & 1 deletion velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,16 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) {
return preGroupedKeys().empty();
}

if ((isIntermediate() || isPartial()) &&
queryConfig.partialAggregationSpillEnabled()) {
return preGroupedKeys().empty();
}

return false;
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 8 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,14 @@ class AggregationNode : public PlanNode {
return step_ == Step::kSingle;
}

bool isIntermediate() const {
return step_ == Step::kIntermediate;
}

bool isPartial() const {
return step_ == Step::kPartial;
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);
Expand Down
13 changes: 12 additions & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ class QueryConfig {
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";

/// Partial aggregation spilling flag, only applies if "spill_enabled" flag is
/// set.
static constexpr const char* kPartialAggregationSpillEnabled =
"partial_aggregation_spill_enabled";

/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";

Expand Down Expand Up @@ -493,11 +498,17 @@ class QueryConfig {
}

/// Returns 'is aggregation spilling enabled' flag. Must also check the
/// spillEnabled()!g
/// spillEnabled()!
bool aggregationSpillEnabled() const {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is partial aggregation spilling enabled' flag. Must also check
/// the spillEnabled()!
bool partialAggregationSpillEnabled() const {
return get<bool>(kPartialAggregationSpillEnabled, false);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
Expand Down
8 changes: 4 additions & 4 deletions velox/core/tests/PlanFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) {
{AggregationNode::Step::kSingle, true, true, false, false, true},
{AggregationNode::Step::kIntermediate, false, true, false, false, false},
{AggregationNode::Step::kIntermediate, true, false, false, false, false},
{AggregationNode::Step::kIntermediate, true, true, true, false, true},
{AggregationNode::Step::kIntermediate, true, true, true, false, false},
{AggregationNode::Step::kIntermediate, true, true, false, true, false},
{AggregationNode::Step::kIntermediate, true, true, false, false, true},
{AggregationNode::Step::kIntermediate, true, true, false, false, false},
{AggregationNode::Step::kPartial, false, true, false, false, false},
{AggregationNode::Step::kPartial, true, false, false, false, false},
{AggregationNode::Step::kPartial, true, true, true, false, true},
{AggregationNode::Step::kPartial, true, true, true, false, false},
{AggregationNode::Step::kPartial, true, true, false, true, false},
{AggregationNode::Step::kPartial, true, true, false, false, true},
{AggregationNode::Step::kPartial, true, true, false, false, false},
{AggregationNode::Step::kFinal, false, true, false, false, false},
{AggregationNode::Step::kFinal, true, false, false, false, false},
{AggregationNode::Step::kFinal, true, true, true, false, true},
Expand Down
20 changes: 14 additions & 6 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ bool GroupingSet::getOutput(
}

if (hasSpilled()) {
spill();
return getOutputWithSpill(maxOutputRows, maxOutputBytes, result);
}
VELOX_CHECK(!isDistinct());
Expand Down Expand Up @@ -929,6 +930,9 @@ void GroupingSet::ensureOutputFits() {
return;
}
}
if (hasSpilled()) {
return;
}
spill(RowContainerIterator{});
}

Expand All @@ -955,11 +959,6 @@ void GroupingSet::spill() {
if (table_ == nullptr || table_->numDistinct() == 0) {
return;
}

if (hasSpilled() && spiller_->finalized()) {
return;
}

if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
Expand Down Expand Up @@ -1049,7 +1048,16 @@ bool GroupingSet::getOutputWithSpill(
if (merge_ == nullptr) {
return false;
}
return mergeNext(maxOutputRows, maxOutputBytes, result);
bool hasData = mergeNext(maxOutputRows, maxOutputBytes, result);
if (!hasData) {
// If spill has been finalized, reset merge stream and spiller. This would
// help partial aggregation replay the spilling procedure once needed again.
merge_ = nullptr;
mergeRows_ = nullptr;
mergeArgs_.clear();
spiller_ = nullptr;
}
return hasData;
}

bool GroupingSet::mergeNext(
Expand Down
126 changes: 0 additions & 126 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Aggregate.h"
Expand Down Expand Up @@ -398,33 +397,6 @@ class AggregationTest : public OperatorTestBase {
VARCHAR()})};
folly::Random::DefaultGenerator rng_;
memory::MemoryReclaimer::Stats reclaimerStats_;

std::shared_ptr<core::QueryCtx> newQueryCtx(
int64_t memoryCapacity = memory::kMaxMemory) {
std::unordered_map<std::string, std::shared_ptr<Config>> configs;
std::shared_ptr<memory::MemoryPool> pool = memoryManager_->addRootPool(
"", memoryCapacity, MemoryReclaimer::create());
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
core::QueryConfig({}),
configs,
cache::AsyncDataCache::getInstance(),
std::move(pool));
return queryCtx;
}

void setupMemory() {
memory::MemoryManagerOptions options;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance();
options.allocator = memoryAllocator_.get();
memoryManager_ = std::make_unique<memory::MemoryManager>(options);
}

private:
std::shared_ptr<memory::MemoryAllocator> memoryAllocator_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
};

template <>
Expand Down Expand Up @@ -875,104 +847,6 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) {
.customStats.count("flushRowCount"));
}

// TODO move to arbitrator test
TEST_F(AggregationTest, partialAggregationSpill) {
VectorFuzzer::Options fuzzerOpts;
fuzzerOpts.vectorSize = 128;
RowTypePtr rowType = ROW(
{{"c0", INTEGER()},
{"c1", INTEGER()},
{"c2", INTEGER()},
{"c3", INTEGER()},
{"c4", INTEGER()},
{"c5", INTEGER()},
{"c6", INTEGER()},
{"c7", INTEGER()},
{"c8", INTEGER()},
{"c9", INTEGER()},
{"c10", INTEGER()}});
VectorFuzzer fuzzer(std::move(fuzzerOpts), pool());

std::vector<RowVectorPtr> vectors;

const int32_t numVectors = 2000;
for (int i = 0; i < numVectors; i++) {
vectors.push_back(fuzzer.fuzzRow(rowType));
}

createDuckDbTable(vectors);

setupMemory();

core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
// Set an artificially low limit on the amount of data to accumulate in
// the partial aggregation.

// Distinct aggregation.
auto spillDirectory1 = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.spillDirectory(spillDirectory1->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(0)) // always spill on final agg
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT distinct c0 FROM tmp");

checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true);

// Count aggregation.
auto spillDirectory2 = exec::test::TempDirectoryPath::create();
task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.spillDirectory(spillDirectory2->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(0)) // always spill on final agg
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"count(1)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1");

checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true);

// Global aggregation.
task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({}, {"sum(c0)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT sum(c0) FROM tmp");
EXPECT_EQ(
0,
toPlanStats(task->taskStats())
.at(partialAggNodeId)
.customStats.count("flushRowCount"));
checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false);
}

TEST_F(AggregationTest, partialDistinctWithAbandon) {
auto vectors = {
// 1st batch will produce 100 distinct groups from 10 rows.
Expand Down
44 changes: 44 additions & 0 deletions velox/exec/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,50 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) {
waitForAllTasksToBeDeleted();
}

TEST_F(SharedArbitrationTest, reclaimFromPartialAggregation) {
const uint64_t maxQueryCapacity = 20L << 20;
std::vector<RowVectorPtr> vectors = newVectors(1024, maxQueryCapacity * 2);
createDuckDbTable(vectors);
const auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(maxQueryCapacity);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(
core::QueryConfig::kMaxPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
.config(
core::QueryConfig::kMaxExtendedPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
.config(
core::QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
core::QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.queryCtx(queryCtx)
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"count(1)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& partialStats = taskStats.at(partialAggNodeId);
auto& finalStats = taskStats.at(finalAggNodeId);
ASSERT_GT(partialStats.spilledBytes, 0);
ASSERT_GT(finalStats.spilledBytes, 0);
task.reset();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromAggregationOnNoMoreInput) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
Expand Down

0 comments on commit 3356607

Please sign in to comment.