Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable spilling support for partial aggregation #7558

Closed

Conversation

zhztheplayer
Copy link
Contributor

@zhztheplayer zhztheplayer commented Nov 14, 2023

Resolves #7930, also be related to #7511

Partial aggregation currently relies on following options to reduce memory usage:

  • kMaxPartialAggregationMemory
  • kMaxExtendedPartialAggregationMemory

The PR adds the following one to enable flushing on partial aggregation:

  • kPartialAggregationSpillEnabled (default value: false)

When kPartialAggregationSpillEnabled is set to true, partial aggregation will try spilling data to disk just like final aggregation can do. At the same time, flushing will be disabled.

Also, when spilling has been triggered, code path of abandoning partial aggregation will be disabled.

Copy link

netlify bot commented Nov 14, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit f7118f5
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/65813ff292045e0008605ff9

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Nov 14, 2023
@zhztheplayer zhztheplayer marked this pull request as ready for review November 17, 2023 06:11
@zhztheplayer zhztheplayer changed the title WIP: Enable spilling support for partial aggregation Enable spilling support for partial aggregation Nov 17, 2023
@zhztheplayer zhztheplayer changed the title Enable spilling support for partial aggregation WIP: Enable spilling support for partial aggregation Nov 17, 2023
@zhztheplayer zhztheplayer marked this pull request as draft November 17, 2023 06:12
@zhztheplayer zhztheplayer marked this pull request as ready for review November 17, 2023 07:18
@zhztheplayer zhztheplayer changed the title WIP: Enable spilling support for partial aggregation Enable spilling support for partial aggregation Nov 17, 2023
@zhztheplayer
Copy link
Contributor Author

@mbasmanova @xiaoxmeng Would you like to review? Thanks!

@zhztheplayer
Copy link
Contributor Author

@mbasmanova @xiaoxmeng Hi, would you please help review? Thanks!

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer if we need to spill in partial aggr for gluten use case, we might need a separate config for this. Thanks!

@zhztheplayer
Copy link
Contributor Author

@xiaoxmeng

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's finish design discussion in #7511 before proceeding with code changes.

@zhztheplayer
Copy link
Contributor Author

Hi @mbasmanova @xiaoxmeng , should we continue the review? Thanks!

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Would you update PR description to describe the latest changes. The description seems outdated.

Also, please, document new configs in https://facebookincubator.github.io/velox/configs.html

return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) {
return preGroupedKeys().empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. Perhaps,

if (preGroupedKeys().empty()) {
   return false();
}

...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But would you mean the following?

if (!preGroupedKeys().empty()) {
  return false; 
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes. Sorry for the typo.

@@ -650,7 +663,8 @@ class ApproxPercentileAggregate : public exec::Aggregate {
if constexpr (checkIntermediateInputs) {
VELOX_USER_CHECK(rowVec);
for (int i = kPercentiles; i <= kAccuracy; ++i) {
VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding());
VELOX_USER_CHECK(isConstantVector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change? Would you extract it into a separate PR, provide clear description and tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was related since without the change AggregateFuzzer would fail.

However I'll try reverting the change since the failure might be related to flushing and spilling was once both enabled in the PR. Probably it's no longer needed.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you check if AggregationFuzzer covers the new code paths? If not, let's extend it.

Comment on lines 1066 to 1072
if (injectPartialSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing will be disabled when partial spill is on so this new dimension is needed to avoid losing coverage.

@zhztheplayer
Copy link
Contributor Author

@mbasmanova Thanks

@zhztheplayer
Copy link
Contributor Author

Hi @mbasmanova I've addressed current comments. Would you please take another look? Thank you.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Thank you for iterating. Would you update PR description to describe changes to the Fuzzer? Would you also run Fuzzer for 1h+ to make sure there are no failures. BTW, have you seen spilling in partial agg during Fuzzer runs? How often does it happen?

@duanmeng @xiaoxmeng Please, help review this PR.

- boolean
- false
- When `spill_enabled` is true, determines whether the partial phase of HashAggregation operator can spill to disk under memory pressure.
Flushing will be disabled so max_partial_aggregation_memory and max_extended_partial_aggregation_memory will be ignored when turning the option on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos: " the option on. this option."

@@ -28,7 +28,7 @@ std::shared_ptr<TempDirectoryPath> TempDirectoryPath::create() {
}

TempDirectoryPath::~TempDirectoryPath() {
LOG(INFO) << "TempDirectoryPath:: removing all files from" << path;
LOG(INFO) << "TempDirectoryPath:: removing all files from " << path;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix. Thanks.

const auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(maxQueryCapacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto

.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(
core::QueryConfig::kMaxPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Enabling partial spilling disables flushing, no?

Copy link
Contributor Author

@zhztheplayer zhztheplayer Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case explicitly eliminates the impact of flushing by disabling it, despite whether we had made flushing and spilling exclusive. In future we may decide to make flushing and spilling work together but this case should still work and still test against flushing independently in that case.

The next test case will ensure flushing is disabled by spilling. Would this make sense to you?

core::QueryConfig::kMaxExtendedPartialAggregationMemory,
std::to_string(1LL << 30)) // disable flush
.config(
core::QueryConfig::kAbandonPartialAggregationMinPct,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// avoid abandoning

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases meant to test against spilling but if partial aggregation got abandoned then spill will hardly happen.

* - partial_aggregation_spill_enabled
- boolean
- false
- When `spill_enabled` is true, determines whether the partial phase of HashAggregation operator can spill to disk under memory pressure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's explain when it could be beneficial to enable this? E.g. much fewer reducers, slow or inefficient shuffle, anything else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just describe the result when turning the option on? For example, just note user the size of data emitted from partial aggregation would be reduced as much as possible if setting to true. Users will know it should be on if they rely on inefficient shuffle or have other specific reasons.

Copy link
Collaborator

@duanmeng duanmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer This PR is crucial to batch / ETL queries, 👍 . Perhaps we could add some test cases in the AggregationTest.

velox/exec/HashAggregation.cpp Show resolved Hide resolved
@duanmeng
Copy link
Collaborator

@mbasmanova @xiaoxmeng Do we need two separate spill-enable flags if our goal is to support spilling for aggregation in all equivalent query plans? WDYT?

@zhztheplayer
Copy link
Contributor Author

@zhztheplayer This PR is crucial to batch / ETL queries, 👍 . Perhaps we could add some test cases in the AggregationTest.

Thanks @duanmeng . I've added some tests to AggregationTest.

fixup

fixup

fixup

doc

fixup

revert

fixup

fixup

fixup

revert unneeded changes

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

Revert "fixup"

This reverts commit ecf530e.

Revert "fixup"

This reverts commit 9bd48de.

fixup

fixup

fixup

fixup

fixup

fixup

fixup

fixup

Revert "fixup"

This reverts commit f9efc1a.

fixup

fixup

fixup

fixup

test

This reverts commit 73bf319b76d74a794d2fcffa3b992f581d69f6a1.
@zhztheplayer
Copy link
Contributor Author

@mbasmanova @xiaoxmeng Do we need two separate spill-enable flags if our goal is to support spilling for aggregation in all equivalent query plans? WDYT?

Let's keep them separate for the initial version? Spilling for partial aggregation should be turned off by default since it disables flushing. While for regular aggregation spilling is on by default.

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Dec 20, 2023

@mbasmanova @xiaoxmeng @duanmeng

After going through some history about partial aggregation and companion functions (especially #4412 and #4566), I am feeling that the topic is on the wrong direction. So I am about to close this.

Reason:

Spark's partial aggregation actually doesn't support flushing so ideally in Gluten we should not use Velox (probably Presto also)'s PartialAggregation. Velox's FinalAggregation + Companion functions is better for Gluten to choose. Since final aggregation already supports spilling so no extra effort is needed in my opinion.

PartialAggregation in Velox as a concept should consist of optimizations when a regular aggregation doesn't have to emit full-aggregated data. Spark doesn't have a plan type mapping to this functionality yet. Adding spilling support to PartialAggreagtion may help in some rare corner cases but the way more or less breaks PartialAggregation's design finally.

@zhztheplayer
Copy link
Contributor Author

Closing. Please reopen if anyone thinks it's still needed.

@mbasmanova
Copy link
Contributor

@zhztheplayer Thank you for the update. It comes as a bit of surprise, but I guess it makes sense. Would you also comment on the 2 related issues and close these?

Resolves #7930, also be related to #7511

@mbasmanova
Copy link
Contributor

@zhztheplayer Curious, what prompted you to take another look and go "through some history about partial aggregation and companion functions".

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Dec 21, 2023

@zhztheplayer Curious, what prompted you to take another look and go "through some history about partial aggregation and companion functions".

One of Gluten's users reported an issue that expression count(distinct l_orderedkey) returns wrong result in Gluten. Spark generated the following plan:

+- HashAggregate(keys=[], functions=[count(distinct l_orderkey#114L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=140]
      +- (Partial) HashAggregate(keys=[], functions=[partial_count(distinct l_orderkey#114L)])    // <- node 2
         +- (Partial) HashAggregate(keys=[l_orderkey#114L], functions=[])   // <- node 1
            +- Exchange hashpartitioning(l_orderkey#114L, 100), ENSURE_REQUIREMENTS, [plan_id=136]
               +- (Partial) HashAggregate(keys=[l_orderkey#114L], functions=[])
                  +- FileScan

The plan node I marked as node 2 is using node 1's output as distinct input to do partial_count. However since node 1 is now mapped to Velox's PartialAggregation, it may flush intermediate data at any time. That makes node 2 output wrong count result.

Looks to be that the user's issue is related to incorrect mapping from Spark's Aggregation + partial function to Velox's PartialAggregation. If we correct the mapping as from Spark's Aggregation + partial function to Velox's FinalAggregation + partial companion function, then we can solve this count+distinct issue and don't have to add spilling support to partial aggregation either. I'm still working with @rui-mo to try to do some refactors to Gluten to use the new mapping, but anyway I think this topic can be closed since it was driven by issue related to Gluten's non-optimal design.

@mbasmanova
Copy link
Contributor

@zhztheplayer Thank you for sharing this context. This is super helpful and now I have a better understanding. Indeed, it sounds like "partial agg" in Spark doesn't mean the same as in Presto / Velox and you are correct that it probably needs to be mapped to final agg + partial companion function. CC: @kagamiori

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable spilling support for partial aggregation
5 participants