Skip to content

Commit

Permalink
Fix hang bug in partial aggregation. (facebookincubator#5050)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#5050

The bug was triggered in the following way:
- In HashAggregation::addInput() for a partial aggregation
abandonPartialAggregationEarly() returned true and we set partialFull_ to true.
- At the same time grouping set didn't have any new distinct groups and set
newDistincts_ to false.
- HashAggregation operator will then keep returning null from getOutput()
and false from needsInput() thus putting Driver::runInternal into the
infinite loop and hanging the query.

The fix is to detect such corner case and unblock by resetting partialFull_ flag.

Reviewed By: xiaoxmeng, Yuhta

Differential Revision: D46214290

fbshipit-source-id: 8094763b4bb302aae89dd09ffa4ec63bc2fd87c8
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed May 26, 2023
1 parent 4d9e533 commit 2cd5ff6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
13 changes: 12 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,10 @@ void HashAggregation::addInput(RowVectorPtr input) {
// NOTE: we should not trigger partial output flush in case of global
// aggregation as the final aggregator will handle it the same way as the
// partial aggregator. Hence, we have to use more memory anyway.
const bool abandonPartialEarly =
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isPartialOutput_ && !isGlobal_ &&
(abandonPartialAggregationEarly(groupingSet_->numDistinct()) ||
(abandonPartialEarly ||
groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_))) {
partialFull_ = true;
}
Expand All @@ -206,6 +208,15 @@ void HashAggregation::addInput(RowVectorPtr input) {
if (newDistincts_) {
// Save input to use for output in getOutput().
input_ = input;
} else {
// In case of 'no new distinct groups' the only reason we can have
// 'partial full' true is due to 'abandoning partial aggregation early'
// being true. If that's not the case, then it is a bug.
VELOX_CHECK_EQ(partialFull_, abandonPartialEarly);
// If no new distinct groups (meaning we don't have anything to output)
// and we are abandoning the partial aggregation, then we need to ensure
// we 'need input'. For that we need to reset the 'partial full' flag.
partialFull_ = false;
}
}
}
Expand Down
57 changes: 50 additions & 7 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

using facebook::velox::core::QueryConfig;
using facebook::velox::exec::Aggregate;
using facebook::velox::test::BatchMaker;
using namespace facebook::velox::common::testutil;

namespace facebook::velox::exec::test {
namespace {

using core::QueryConfig;
using facebook::velox::test::BatchMaker;
using namespace common::testutil;

/// No-op implementation of Aggregate. Provides public access to following
/// base class methods: setNull, clearNull and isNull.
Expand Down Expand Up @@ -748,6 +746,52 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) {
.customStats.count("flushRowCount"));
}

TEST_F(AggregationTest, partialDistinctWithAbandon) {
auto vectors = {
// 1st batch will produce 100 distinct groups from 10 rows.
makeRowVector(
{makeFlatVector<int32_t>(100, [](auto row) { return row; })}),
// 2st batch will trigger abandon partial aggregation event with no new
// distinct values.
makeRowVector({makeFlatVector<int32_t>(1, [](auto row) { return row; })}),
// 3rd batch will not produce any new distinct values.
makeRowVector(
{makeFlatVector<int32_t>(50, [](auto row) { return row; })}),
// 4th batch will not produce 10 new distinct values.
makeRowVector(
{makeFlatVector<int32_t>(200, [](auto row) { return row % 110; })}),
};

createDuckDbTable(vectors);

// We are setting abandon partial aggregation config properties to low values,
// so they are triggered on the second batch.

// Distinct aggregation.
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, "100")
.config(QueryConfig::kAbandonPartialAggregationMinPct, "50")
.config("max_drivers_per_task", "1")
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {})
.finalAggregation()
.planNode())
.assertResults("SELECT distinct c0 FROM tmp");

// with aggregation, just in case.
task = AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, "100")
.config(QueryConfig::kAbandonPartialAggregationMinPct, "50")
.config("max_drivers_per_task", "1")
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"sum(c0)"})
.finalAggregation()
.planNode())
.assertResults("SELECT distinct c0, sum(c0) FROM tmp group by c0");
}

TEST_F(AggregationTest, largeValueRangeArray) {
// We have keys that map to integer range. The keys are
// a little under max array hash table size apart. This wastes 16MB of
Expand Down Expand Up @@ -2214,5 +2258,4 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) {
}
}

} // namespace
} // namespace facebook::velox::exec::test

0 comments on commit 2cd5ff6

Please sign in to comment.