Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable spilling support for partial aggregation #7558

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,20 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
if (!preGroupedKeys().empty()) {
return false;
}

if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) {
return true;
}

if ((isIntermediate() || isPartial()) &&
queryConfig.partialAggregationSpillEnabled()) {
return true;
}

return false;
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 8 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,14 @@ class AggregationNode : public PlanNode {
return step_ == Step::kSingle;
}

bool isIntermediate() const {
return step_ == Step::kIntermediate;
}

bool isPartial() const {
return step_ == Step::kPartial;
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);
Expand Down
16 changes: 15 additions & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ class QueryConfig {
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";

/// Partial aggregation spilling flag, only applies if "spill_enabled" flag is
/// set.
/// If true, partial aggregation flushing will be disabled. Which means,
/// settings of kMaxPartialAggregationMemory and
/// kMaxExtendedPartialAggregationMemory will be ignored.
static constexpr const char* kPartialAggregationSpillEnabled =
"partial_aggregation_spill_enabled";

/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";

Expand Down Expand Up @@ -525,11 +533,17 @@ class QueryConfig {
}

/// Returns 'is aggregation spilling enabled' flag. Must also check the
/// spillEnabled()!g
/// spillEnabled()!
bool aggregationSpillEnabled() const {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is partial aggregation spilling enabled' flag. Must also check
/// the spillEnabled()!
bool partialAggregationSpillEnabled() const {
return get<bool>(kPartialAggregationSpillEnabled, false);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
Expand Down
7 changes: 7 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ Spilling
- boolean
- true
- When `spill_enabled` is true, determines whether HashAggregation operator can spill to disk under memory pressure.
* - partial_aggregation_spill_enabled
- boolean
- false
- When `spill_enabled` is true, determines whether the partial phase of HashAggregation operator can spill to disk under memory pressure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's explain when it could be beneficial to enable this? E.g. much fewer reducers, slow or inefficient shuffle, anything else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just describe the result when turning the option on? For example, just note user the size of data emitted from partial aggregation would be reduced as much as possible if setting to true. Users will know it should be on if they rely on inefficient shuffle or have other specific reasons.

When true, flushing will be disabled so settings of max_partial_aggregation_memory and max_extended_partial_aggregation_memory will be ignored.
Comparing to flushing, enabling spilling would make Velox reduce data size from partial aggregation phase as much as possible however would slow
down partial aggregation's own processing.
* - join_spill_enabled
- boolean
- true
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ const HashLookup& GroupingSet::hashLookup() const {
void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
// Spilling is considered if this is a final or single aggregation and
// spillPath is set.
if (isPartial_ || spillConfig_ == nullptr) {
if (spillConfig_ == nullptr) {
return;
}

Expand Down Expand Up @@ -888,7 +888,7 @@ void GroupingSet::ensureOutputFits() {
// to reserve memory for the output as we can't reclaim much memory from this
// operator itself. The output processing can reclaim memory from the other
// operator or query through memory arbitration.
if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) {
if (spillConfig_ == nullptr || hasSpilled()) {
return;
}

Expand Down Expand Up @@ -938,7 +938,6 @@ void GroupingSet::spill() {
if (table_ == nullptr || table_->numDistinct() == 0) {
return;
}

if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
Expand Down Expand Up @@ -1175,6 +1174,8 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
}

void GroupingSet::abandonPartialAggregation() {
VELOX_CHECK(!hasSpilled())

abandonedPartialAggregation_ = true;
allSupportToIntermediate_ = true;
for (auto& aggregate : aggregates_) {
Expand Down
12 changes: 11 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ void HashAggregation::initialize() {

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
VELOX_CHECK(isPartialOutput_ && !isGlobal_);
if (groupingSet_->hasSpilled()) {
// Once spilling kicked in, disable the abandoning code path.
duanmeng marked this conversation as resolved.
Show resolved Hide resolved
// This is because spilled rows did not count to
// numOutput yet based on current way of calculation.
return false;
}
return numInputRows_ > abandonPartialAggregationMinRows_ &&
100 * numOutput / numInputRows_ >= abandonPartialAggregationMinPct_;
}
Expand All @@ -135,9 +141,13 @@ void HashAggregation::addInput(RowVectorPtr input) {
// NOTE: we should not trigger partial output flush in case of global
// aggregation as the final aggregator will handle it the same way as the
// partial aggregator. Hence, we have to use more memory anyway.
//
// We currently disable flushing when spilling is enabled. It's possible
mbasmanova marked this conversation as resolved.
Show resolved Hide resolved
// to make spilling and flushing work together once we had a way to
// count the spilled data size into partial aggregation's memory usage.
const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isPartialOutput_ && !isGlobal_ &&
if (!spillConfig_.has_value() && isPartialOutput_ && !isGlobal_ &&
(abandonPartialEarly ||
groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_))) {
partialFull_ = true;
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
false /*injectSpill*/,
false /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
Expand All @@ -176,6 +177,19 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);

LOG(INFO) << "Testing plan #" << i
<< " with partial aggregation spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
true /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
Expand All @@ -188,6 +202,7 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
false /*injectSpill*/,
false /*injectPartialSpill*/,
true /*abandonPartial*/,
customVerification,
customVerifiers,
Expand Down
13 changes: 11 additions & 2 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
const core::PlanNodePtr& plan,
const std::vector<exec::Split>& splits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
int32_t maxDrivers) {
LOG(INFO) << "Executing query plan: " << std::endl
Expand All @@ -371,12 +372,18 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(

builder.configs(queryConfigs_);

if (injectSpill) {
if (injectSpill || injectPartialSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100");
if (injectSpill) {
builder.config(core::QueryConfig::kAggregationSpillEnabled, "true");
}
if (injectPartialSpill) {
builder.config(
core::QueryConfig::kPartialAggregationSpillEnabled, "true");
}
}

if (abandonPartial) {
Expand Down Expand Up @@ -431,6 +438,7 @@ AggregationFuzzerBase::computeReferenceResults(
void AggregationFuzzerBase::testPlan(
const PlanWithSplits& planWithSplits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
Expand All @@ -440,6 +448,7 @@ void AggregationFuzzerBase::testPlan(
planWithSplits.plan,
planWithSplits.splits,
injectSpill,
injectPartialSpill,
abandonPartial,
maxDrivers);

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class AggregationFuzzerBase {
const core::PlanNodePtr& plan,
const std::vector<exec::Split>& splits = {},
bool injectSpill = false,
bool injectPartialSpill = false,
bool abandonPartial = false,
int32_t maxDrivers = 2);

Expand All @@ -201,6 +202,7 @@ class AggregationFuzzerBase {
void testPlan(
const PlanWithSplits& planWithSplits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
Expand Down
122 changes: 122 additions & 0 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,128 @@ TEST_F(AggregationTest, spillAll) {
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(AggregationTest, partialSpillWithMemoryLimit) {
constexpr int32_t kNumDistinct = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
rng_.seed(1);
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
auto batches = makeVectors(rowType_, 100, 5);

core::PlanNodeId partialAggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.partialAggregation({"c0"}, {"count(1)", "sum(c1)"})
.capturePlanNodeId(partialAggrNodeId)
.finalAggregation()
.planNode();
const auto expectedResults =
AssertQueryBuilder(plan).copyResults(pool_.get());

struct {
uint64_t aggregationMemLimit;
bool expectSpill;

std::string debugString() const {
return fmt::format(
"aggregationMemLimit:{}, expectSpill:{}",
aggregationMemLimit,
expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task =
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(testData.aggregationMemLimit))
.config(
QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.assertResults(expectedResults);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(partialAggrNodeId);
checkSpillStats(stats, testData.expectSpill);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

TEST_F(AggregationTest, partialDistinctSpillWithMemoryLimit) {
constexpr int32_t kNumDistinct = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
rng_.seed(1);
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
auto batches = makeVectors(rowType_, 100, 5);

core::PlanNodeId partialAggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.partialAggregation({"c0"}, {}, {})
.capturePlanNodeId(partialAggrNodeId)
.finalAggregation({"c0"}, {}, {})
.planNode();
const auto expectedResults =
AssertQueryBuilder(plan).copyResults(pool_.get());

struct {
uint64_t aggregationMemLimit;
bool expectSpill;

std::string debugString() const {
return fmt::format(
"aggregationMemLimit:{}, expectSpill:{}",
aggregationMemLimit,
expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task =
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(testData.aggregationMemLimit))
.config(
QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.assertResults(expectedResults);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(partialAggrNodeId);
checkSpillStats(stats, testData.expectSpill);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

// Verify number of memory allocations in the HashAggregation operator.
TEST_F(AggregationTest, memoryAllocations) {
vector_size_t size = 1'024;
Expand Down
Loading
Loading