From 84f8551c4fc0e10fdbaaf3a0049e83d2bf097457 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Wed, 9 Aug 2023 10:40:13 -0700 Subject: [PATCH] Fix flaky reclaimFromCompletedJoinBuilder test (#6038) Summary: Fix flaky reclaimFromCompletedJoinBuilder test by ensuring the task has been destroyed. The test passed 400 iterations internally. Pull Request resolved: https://github.com/facebookincubator/velox/pull/6038 Reviewed By: pedroerp, mbasmanova Differential Revision: D48167486 Pulled By: xiaoxmeng fbshipit-source-id: 931978ba47ad25e0a67a42614b7863952c283144 --- .../memory/tests/MemoryCapExceededTest.cpp | 2 +- .../memory/tests/SharedArbitratorTest.cpp | 40 +++++++++-------- velox/exec/Task.cpp | 23 ---------- velox/exec/Task.h | 6 --- velox/exec/tests/AggregationTest.cpp | 4 +- velox/exec/tests/HashJoinTest.cpp | 6 +-- velox/exec/tests/OrderByTest.cpp | 4 +- velox/exec/tests/utils/OperatorTestBase.cpp | 6 ++- velox/exec/tests/utils/QueryAssertions.cpp | 45 +++++++++++++++++++ velox/exec/tests/utils/QueryAssertions.h | 12 +++++ velox/vector/tests/utils/VectorTestBase.cpp | 4 -- 11 files changed, 91 insertions(+), 61 deletions(-) diff --git a/velox/common/memory/tests/MemoryCapExceededTest.cpp b/velox/common/memory/tests/MemoryCapExceededTest.cpp index 856b7cad1f43..f55331d08620 100644 --- a/velox/common/memory/tests/MemoryCapExceededTest.cpp +++ b/velox/common/memory/tests/MemoryCapExceededTest.cpp @@ -244,7 +244,7 @@ TEST_P(MemoryCapExceededTest, allocatorCapacityExceededError) { << "', but received '" << errorMessage << "'."; } } - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index d894fcb44daa..791f1e5dc8f2 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -419,7 +419,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromOrderBy) { orderByThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -507,7 +507,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { orderByThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); } @@ -577,7 +577,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedOrderBy) { orderByThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -676,7 +676,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, DISABLED_reclaimFromAggregation) { aggregationThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -764,7 +764,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { aggregationThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); @@ -835,7 +835,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedAggregation) { aggregationThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -944,7 +944,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromJoinBuilder) { aggregationThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -1043,7 +1043,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { joinThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); @@ -1061,6 +1061,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) { for (bool sameQuery : sameQueries) { SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); const auto spillDirectory = exec::test::TempDirectoryPath::create(); + const uint64_t numCreatedTasks = Task::numCreatedTasks(); std::shared_ptr fakeMemoryQueryCtx = newQueryCtx(kMemoryCapacity); std::shared_ptr joinQueryCtx; @@ -1107,6 +1108,9 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) { .assertResults( "SELECT c1 FROM tmp WHERE c0 NOT IN (SELECT c0 FROM tmp)"); waitForTaskCompletion(task.get()); + task.reset(); + // Make sure the join query task has been destroyed. + waitForAllTasksToBeDeleted(numCreatedTasks + 1, 3'000'000); fakeAllocationWait.notify(); }); @@ -1125,7 +1129,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) { joinThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -1250,7 +1254,7 @@ DEBUG_ONLY_TEST_F( }); joinThread.join(); memThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -1408,7 +1412,7 @@ DEBUG_ONLY_TEST_F( memThread.join(); // We only expect to reclaim from one hash build operator once. ASSERT_EQ(numHashBuildReclaims, 1); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F( @@ -1561,7 +1565,7 @@ DEBUG_ONLY_TEST_F( // We only expect to reclaim from one hash build operator once. ASSERT_EQ(numHashBuildReclaims, 1); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F( @@ -1614,7 +1618,7 @@ DEBUG_ONLY_TEST_F( .assertResults( "SELECT t.c1 FROM tmp as t, tmp AS u WHERE t.c0 == u.c1 AND t.c1 == u.c0"); ASSERT_TRUE(parallelBuildTriggered); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F( @@ -1695,7 +1699,7 @@ DEBUG_ONLY_TEST_F( .assertResults( "SELECT t.c1 FROM tmp as t, tmp AS u WHERE t.c0 == u.c1 AND t.c1 == u.c0"); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); ASSERT_EQ(injectAllocations.size(), 2); } @@ -1788,7 +1792,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimDuringJoinTableBuild) { memThread.join(); queryThread.join(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { @@ -1919,7 +1923,7 @@ DEBUG_ONLY_TEST_F( queryThread.join(); fakeAllocation.free(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) { @@ -1984,7 +1988,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) { queryThread.join(); fakeAllocation.free(); injectAllocation->free(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { @@ -2059,7 +2063,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { fakeAllocation.free(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } TEST_F(SharedArbitrationTest, concurrentArbitration) { diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index c94057ea2fe8..a611e50645df 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2377,29 +2377,6 @@ std::shared_ptr Task::getSpillOperatorGroupLocked( return group; } -// static -void Task::testingWaitForAllTasksToBeDeleted(uint64_t maxWaitUs) { - const uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); - uint64_t waitUs = 0; - while (numCreatedTasks > numDeletedTasks) { - constexpr uint64_t kWaitInternalUs = 1'000; - std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); - waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); - if (waitUs >= maxWaitUs) { - break; - } - } - VELOX_CHECK_EQ( - numDeletedTasks, - numCreatedTasks, - "{} tasks have been created while only {} have been deleted after waiting for {} us", - numCreatedTasks, - numDeletedTasks, - waitUs); -} - void Task::testingVisitDrivers(const std::function& callback) { std::lock_guard l(mutex_); for (int i = 0; i < drivers_.size(); ++i) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 360bdc77f68d..b6696f145c8f 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -576,12 +576,6 @@ class Task : public std::enable_shared_from_this { return numDriversInPartitionedOutput_ > 0; } - /// Invoked to wait for all the tasks created by the test to be deleted. - /// - /// NOTE: it is assumed that there is no more task to be created after or - /// during this wait call. This is for testing purpose for now. - static void testingWaitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000); - /// Invoked to run provided 'callback' on each alive driver of the task. void testingVisitDrivers(const std::function& callback); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 10dd21d016ba..ec4a83d8ed0f 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -2397,7 +2397,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringOutputProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -2492,7 +2492,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringInputgProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 1e2818b3a5cf..03d880eece19 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5298,7 +5298,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -5403,7 +5403,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputgProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -5508,7 +5508,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeAbortDuringInputProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } } // namespace diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 65de18d62ad9..6d74caacdb99 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1123,7 +1123,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } @@ -1219,6 +1219,6 @@ DEBUG_ONLY_TEST_F(OrderByTest, abortDuringInputgProcessing) { driverWait.notify(); taskThread.join(); task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } } diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 3fd00d10d4a1..7d6a8aab16a2 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -42,6 +42,8 @@ void OperatorTestBase::registerVectorSerde() { } OperatorTestBase::~OperatorTestBase() { + // Wait for all the tasks to be deleted. + exec::test::waitForAllTasksToBeDeleted(); // Revert to default process-wide MemoryAllocator. memory::MemoryAllocator::setDefaultInstance(nullptr); } @@ -53,7 +55,7 @@ void OperatorTestBase::SetUpTestCase() { } void OperatorTestBase::TearDownTestCase() { - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); } void OperatorTestBase::SetUp() { @@ -172,7 +174,7 @@ core::TypedExprPtr OperatorTestBase::parseExpr( // Wait for the task to go. task.reset(); - Task::testingWaitForAllTasksToBeDeleted(); + waitForAllTasksToBeDeleted(); // If a spilling directory was set, ensure it was removed after the task is // gone. diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 22a905b272fb..1f7204597888 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1248,6 +1248,51 @@ bool waitForTaskDriversToFinish(exec::Task* task, uint64_t maxWaitMicros) { return task->numFinishedDrivers() == task->numTotalDrivers(); } +void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) { + const uint64_t numCreatedTasks = Task::numCreatedTasks(); + uint64_t numDeletedTasks = Task::numDeletedTasks(); + uint64_t waitUs = 0; + while (numCreatedTasks > numDeletedTasks) { + constexpr uint64_t kWaitInternalUs = 1'000; + std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); + waitUs += kWaitInternalUs; + numDeletedTasks = Task::numDeletedTasks(); + if (waitUs >= maxWaitUs) { + break; + } + } + VELOX_CHECK_EQ( + numDeletedTasks, + numCreatedTasks, + "{} tasks have been created while only {} have been deleted after waiting for {} us", + numCreatedTasks, + numDeletedTasks, + waitUs); +} + +void waitForAllTasksToBeDeleted( + uint64_t expectedDeletedTasks, + uint64_t maxWaitUs) { + uint64_t numDeletedTasks = Task::numDeletedTasks(); + uint64_t waitUs = 0; + while (expectedDeletedTasks > numDeletedTasks) { + constexpr uint64_t kWaitInternalUs = 1'000; + std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); + waitUs += kWaitInternalUs; + numDeletedTasks = Task::numDeletedTasks(); + if (waitUs >= maxWaitUs) { + break; + } + } + VELOX_CHECK_EQ( + numDeletedTasks, + expectedDeletedTasks, + "expected {} tasks to be deleted but only {} have been deleted after waiting for {} us", + expectedDeletedTasks, + numDeletedTasks, + waitUs); +} + std::shared_ptr assertQuery( const core::PlanNodePtr& plan, std::function addSplits, diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index 4cf485197ade..464446e47ffa 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -131,6 +131,18 @@ bool waitForTaskDriversToFinish( exec::Task* task, uint64_t maxWaitMicros = 1'000'000); +/// Invoked to wait for all the tasks created by the test to be deleted. +/// +/// NOTE: it is assumed that there is no more task to be created after or +/// during this wait call. This is for testing purpose for now. +void waitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000); + +/// Similar to above test utility except waiting for a specific number of +/// tasks to be deleted. +void waitForAllTasksToBeDeleted( + uint64_t expectedDeletedTasks, + uint64_t maxWaitUs); + std::shared_ptr assertQuery( const core::PlanNodePtr& plan, const std::string& duckDbSql, diff --git a/velox/vector/tests/utils/VectorTestBase.cpp b/velox/vector/tests/utils/VectorTestBase.cpp index 76b06bbaaaf7..e2312d28ae6b 100644 --- a/velox/vector/tests/utils/VectorTestBase.cpp +++ b/velox/vector/tests/utils/VectorTestBase.cpp @@ -16,8 +16,6 @@ #include "velox/vector/tests/utils/VectorTestBase.h" -#include "velox/exec/Task.h" - namespace facebook::velox::test { BufferPtr makeIndicesInReverse(vector_size_t size, memory::MemoryPool* pool) { @@ -44,8 +42,6 @@ BufferPtr makeIndices( } VectorTestBase::~VectorTestBase() { - // Wait for all the tasks to be deleted. - exec::Task::testingWaitForAllTasksToBeDeleted(); // Reset the executor to wait for all the async activities to finish. executor_.reset(); }