Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup

fixup

fixup

doc

fixup

revert

fixup

fixup

fixup

revert unneeded changes

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

Revert "fixup"

This reverts commit ecf530e.

Revert "fixup"

This reverts commit 9bd48de.

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

Revert "fixup"

This reverts commit f9efc1a.

fixup

fixup

fixup

fixup

test

This reverts commit 73bf319b76d74a794d2fcffa3b992f581d69f6a1.
  • Loading branch information
zhztheplayer committed Dec 15, 2023
1 parent 4466c87 commit 667d24d
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 8 deletions.
16 changes: 14 additions & 2 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,20 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
if (!preGroupedKeys().empty()) {
return false;
}

if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) {
return true;
}

if ((isIntermediate() || isPartial()) &&
queryConfig.partialAggregationSpillEnabled()) {
return true;
}

return false;
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 8 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,14 @@ class AggregationNode : public PlanNode {
return step_ == Step::kSingle;
}

bool isIntermediate() const {
return step_ == Step::kIntermediate;
}

bool isPartial() const {
return step_ == Step::kPartial;
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);
Expand Down
16 changes: 15 additions & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ class QueryConfig {
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";

/// Partial aggregation spilling flag, only applies if "spill_enabled" flag is
/// set.
/// If true, partial aggregation flushing will be disabled. Which means,
/// settings of kMaxPartialAggregationMemory and
/// kMaxExtendedPartialAggregationMemory will be ignored.
static constexpr const char* kPartialAggregationSpillEnabled =
"partial_aggregation_spill_enabled";

/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";

Expand Down Expand Up @@ -513,11 +521,17 @@ class QueryConfig {
}

/// Returns 'is aggregation spilling enabled' flag. Must also check the
/// spillEnabled()!g
/// spillEnabled()!
bool aggregationSpillEnabled() const {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is partial aggregation spilling enabled' flag. Must also check
/// the spillEnabled()!
bool partialAggregationSpillEnabled() const {
return get<bool>(kPartialAggregationSpillEnabled, false);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
Expand Down
6 changes: 6 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ Spilling
- boolean
- true
- When `spill_enabled` is true, determines whether HashAggregation operator can spill to disk under memory pressure.
* - partial_aggregation_spill_enabled
- boolean
- false
- When `spill_enabled` is true, determines whether the partial phase of HashAggregation operator can spill to disk under memory pressure.
Flushing will be disabled so max_partial_aggregation_memory and max_extended_partial_aggregation_memory will be ignored when turning the option on.
this option.
* - join_spill_enabled
- boolean
- true
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ const HashLookup& GroupingSet::hashLookup() const {
void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
// Spilling is considered if this is a final or single aggregation and
// spillPath is set.
if (isPartial_ || spillConfig_ == nullptr) {
if (spillConfig_ == nullptr) {
return;
}

Expand Down Expand Up @@ -911,7 +911,7 @@ void GroupingSet::ensureOutputFits() {
// to reserve memory for the output as we can't reclaim much memory from this
// operator itself. The output processing can reclaim memory from the other
// operator or query through memory arbitration.
if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) {
if (spillConfig_ == nullptr || hasSpilled()) {
return;
}

Expand Down Expand Up @@ -961,7 +961,6 @@ void GroupingSet::spill() {
if (table_ == nullptr || table_->numDistinct() == 0) {
return;
}

if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
Expand Down Expand Up @@ -1196,6 +1195,8 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
}

void GroupingSet::abandonPartialAggregation() {
VELOX_CHECK(!hasSpilled())

abandonedPartialAggregation_ = true;
allSupportToIntermediate_ = true;
for (auto& aggregate : aggregates_) {
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ void HashAggregation::initialize() {

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
VELOX_CHECK(isPartialOutput_ && !isGlobal_);
if (groupingSet_->hasSpilled()) {
// Once spilling kicked in, disable the abandoning code path.
return false;
}
return numInputRows_ > abandonPartialAggregationMinRows_ &&
100 * numOutput / numInputRows_ >= abandonPartialAggregationMinPct_;
}
Expand All @@ -135,9 +139,13 @@ void HashAggregation::addInput(RowVectorPtr input) {
// NOTE: we should not trigger partial output flush in case of global
// aggregation as the final aggregator will handle it the same way as the
// partial aggregator. Hence, we have to use more memory anyway.
//
// We currently disable flushing when spilling is enabled. It's possible
// to make spilling and flushing work together once we had a way to
// count the spilled data size into partial aggregation's memory usage.
const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isPartialOutput_ && !isGlobal_ &&
if (!spillConfig_.has_value() && isPartialOutput_ && !isGlobal_ &&
(abandonPartialEarly ||
groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_))) {
partialFull_ = true;
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
false /*injectSpill*/,
false /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
Expand All @@ -176,6 +177,19 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);

LOG(INFO) << "Testing plan #" << i
<< " with partial aggregation spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
true /*injectPartialSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
Expand All @@ -188,6 +202,7 @@ class AggregationFuzzer : public AggregationFuzzerBase {
testPlan(
planWithSplits,
false /*injectSpill*/,
false /*injectPartialSpill*/,
true /*abandonPartial*/,
customVerification,
customVerifiers,
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
const core::PlanNodePtr& plan,
const std::vector<exec::Split>& splits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
int32_t maxDrivers) {
LOG(INFO) << "Executing query plan: " << std::endl
Expand All @@ -379,6 +380,14 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
.config(core::QueryConfig::kTestingSpillPct, "100");
}

if (injectPartialSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100");
}

if (abandonPartial) {
builder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, "1")
.config(core::QueryConfig::kAbandonPartialAggregationMinPct, "0")
Expand Down Expand Up @@ -431,6 +440,7 @@ AggregationFuzzerBase::computeReferenceResults(
void AggregationFuzzerBase::testPlan(
const PlanWithSplits& planWithSplits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
Expand All @@ -440,6 +450,7 @@ void AggregationFuzzerBase::testPlan(
planWithSplits.plan,
planWithSplits.splits,
injectSpill,
injectPartialSpill,
abandonPartial,
maxDrivers);

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class AggregationFuzzerBase {
const core::PlanNodePtr& plan,
const std::vector<exec::Split>& splits = {},
bool injectSpill = false,
bool injectPartialSpill = false,
bool abandonPartial = false,
int32_t maxDrivers = 2);

Expand All @@ -201,6 +202,7 @@ class AggregationFuzzerBase {
void testPlan(
const PlanWithSplits& planWithSplits,
bool injectSpill,
bool injectPartialSpill,
bool abandonPartial,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
Expand Down
93 changes: 93 additions & 0 deletions velox/exec/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,99 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) {
waitForAllTasksToBeDeleted();
}

TEST_F(SharedArbitrationTest, reclaimFromPartialAggregation) {
const uint64_t maxQueryCapacity = 20L << 20;
std::vector<RowVectorPtr> vectors = newVectors(1024, maxQueryCapacity * 2);
createDuckDbTable(vectors);
const auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(maxQueryCapacity);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(
core::QueryConfig::kMaxPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
.config(
core::QueryConfig::kMaxExtendedPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
.config(
core::QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
core::QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.queryCtx(queryCtx)
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"count(1)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& partialStats = taskStats.at(partialAggNodeId);
auto& finalStats = taskStats.at(finalAggNodeId);
ASSERT_GT(partialStats.spilledBytes, 0);
ASSERT_GT(finalStats.spilledBytes, 0);
task.reset();
waitForAllTasksToBeDeleted();
}

TEST_F(
SharedArbitrationTest,
reclaimFromPartialAggregationAndIgnoreFlushingSettings) {
const uint64_t maxQueryCapacity = 20L << 20;
std::vector<RowVectorPtr> vectors = newVectors(1024, maxQueryCapacity * 2);
createDuckDbTable(vectors);
const auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(maxQueryCapacity);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(
core::QueryConfig::kMaxPartialAggregationMemory,
std::to_string(1L))
.config(
core::QueryConfig::kMaxExtendedPartialAggregationMemory,
std::to_string(1L))
.config(
core::QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
core::QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.queryCtx(queryCtx)
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"count(1)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& partialStats = taskStats.at(partialAggNodeId);
auto& finalStats = taskStats.at(finalAggNodeId);
ASSERT_EQ(
partialStats.customStats.find("flushRowCount"),
partialStats.customStats.end());
ASSERT_GT(partialStats.spilledBytes, 0);
ASSERT_GT(finalStats.spilledBytes, 0);
task.reset();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromAggregationOnNoMoreInput) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/TempDirectoryPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::shared_ptr<TempDirectoryPath> TempDirectoryPath::create() {
}

TempDirectoryPath::~TempDirectoryPath() {
LOG(INFO) << "TempDirectoryPath:: removing all files from" << path;
LOG(INFO) << "TempDirectoryPath:: removing all files from " << path;
try {
boost::filesystem::remove_all(path.c_str());
} catch (...) {
Expand Down

0 comments on commit 667d24d

Please sign in to comment.