From 2cd5ff6f8c97b4febaf65330b2703a8f8b41e5da Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 26 May 2023 16:44:33 -0700 Subject: [PATCH] Fix hang bug in partial aggregation. (#5050) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/5050 The bug was triggered in the following way: - In HashAggregation::addInput() for a partial aggregation abandonPartialAggregationEarly() returned true and we set partialFull_ to true. - At the same time grouping set didn't have any new distinct groups and set newDistincts_ to false. - HashAggregation operator will then keep returning null from getOutput() and false from needsInput() thus putting Driver::runInternal into the infinite loop and hanging the query. The fix is to detect such corner case and unblock by resetting partialFull_ flag. Reviewed By: xiaoxmeng, Yuhta Differential Revision: D46214290 fbshipit-source-id: 8094763b4bb302aae89dd09ffa4ec63bc2fd87c8 --- velox/exec/HashAggregation.cpp | 13 ++++++- velox/exec/tests/AggregationTest.cpp | 57 ++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index b6001e90e6d6..aba54f61f203 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -194,8 +194,10 @@ void HashAggregation::addInput(RowVectorPtr input) { // NOTE: we should not trigger partial output flush in case of global // aggregation as the final aggregator will handle it the same way as the // partial aggregator. Hence, we have to use more memory anyway. + const bool abandonPartialEarly = + abandonPartialAggregationEarly(groupingSet_->numDistinct()); if (isPartialOutput_ && !isGlobal_ && - (abandonPartialAggregationEarly(groupingSet_->numDistinct()) || + (abandonPartialEarly || groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_))) { partialFull_ = true; } @@ -206,6 +208,15 @@ void HashAggregation::addInput(RowVectorPtr input) { if (newDistincts_) { // Save input to use for output in getOutput(). input_ = input; + } else { + // In case of 'no new distinct groups' the only reason we can have + // 'partial full' true is due to 'abandoning partial aggregation early' + // being true. If that's not the case, then it is a bug. + VELOX_CHECK_EQ(partialFull_, abandonPartialEarly); + // If no new distinct groups (meaning we don't have anything to output) + // and we are abandoning the partial aggregation, then we need to ensure + // we 'need input'. For that we need to reset the 'partial full' flag. + partialFull_ = false; } } } diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 691e70f2f440..8435d85e0603 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -33,13 +33,11 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/vector/fuzzer/VectorFuzzer.h" -using facebook::velox::core::QueryConfig; -using facebook::velox::exec::Aggregate; -using facebook::velox::test::BatchMaker; -using namespace facebook::velox::common::testutil; - namespace facebook::velox::exec::test { -namespace { + +using core::QueryConfig; +using facebook::velox::test::BatchMaker; +using namespace common::testutil; /// No-op implementation of Aggregate. Provides public access to following /// base class methods: setNull, clearNull and isNull. @@ -748,6 +746,52 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) { .customStats.count("flushRowCount")); } +TEST_F(AggregationTest, partialDistinctWithAbandon) { + auto vectors = { + // 1st batch will produce 100 distinct groups from 10 rows. + makeRowVector( + {makeFlatVector(100, [](auto row) { return row; })}), + // 2st batch will trigger abandon partial aggregation event with no new + // distinct values. + makeRowVector({makeFlatVector(1, [](auto row) { return row; })}), + // 3rd batch will not produce any new distinct values. + makeRowVector( + {makeFlatVector(50, [](auto row) { return row; })}), + // 4th batch will not produce 10 new distinct values. + makeRowVector( + {makeFlatVector(200, [](auto row) { return row % 110; })}), + }; + + createDuckDbTable(vectors); + + // We are setting abandon partial aggregation config properties to low values, + // so they are triggered on the second batch. + + // Distinct aggregation. + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .config(QueryConfig::kAbandonPartialAggregationMinRows, "100") + .config(QueryConfig::kAbandonPartialAggregationMinPct, "50") + .config("max_drivers_per_task", "1") + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {}) + .finalAggregation() + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + + // with aggregation, just in case. + task = AssertQueryBuilder(duckDbQueryRunner_) + .config(QueryConfig::kAbandonPartialAggregationMinRows, "100") + .config(QueryConfig::kAbandonPartialAggregationMinPct, "50") + .config("max_drivers_per_task", "1") + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"sum(c0)"}) + .finalAggregation() + .planNode()) + .assertResults("SELECT distinct c0, sum(c0) FROM tmp group by c0"); +} + TEST_F(AggregationTest, largeValueRangeArray) { // We have keys that map to integer range. The keys are // a little under max array hash table size apart. This wastes 16MB of @@ -2214,5 +2258,4 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { } } -} // namespace } // namespace facebook::velox::exec::test