From bacd769bcaf99400b35f17460998ac8fa514f4bf Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Thu, 31 Oct 2024 10:29:00 -0700 Subject: [PATCH] Fix fast abort condition in global arbitration (#11382) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11382 Reviewed By: xiaoxmeng Differential Revision: D65189869 Pulled By: tanjialiang fbshipit-source-id: aafa8f39b88f0a1124cbc30b2d3368769471980e --- velox/common/memory/SharedArbitrator.cpp | 8 +- .../memory/tests/MockSharedArbitratorTest.cpp | 81 +++++++++++++++++++ .../memory/tests/SharedArbitratorTestUtil.h | 4 + 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index f5620a96622e..3492259b70fc 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -834,9 +834,6 @@ void SharedArbitrator::globalArbitrationMain() { } void SharedArbitrator::runGlobalArbitration() { - TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::runGlobalArbitration", this); - const uint64_t startTimeMs = getCurrentTimeMs(); uint64_t totalReclaimedBytes{0}; bool reclaimByAbort{false}; @@ -845,6 +842,9 @@ void SharedArbitrator::runGlobalArbitration() { std::unordered_set failedParticipants; bool allParticipantsReclaimed{false}; + TestValue::adjust( + "facebook::velox::memory::SharedArbitrator::runGlobalArbitration", this); + size_t round{0}; for (;; ++round) { uint64_t arbitrationTimeUs{0}; @@ -860,7 +860,7 @@ void SharedArbitrator::runGlobalArbitration() { // // TODO: make the time based condition check configurable. reclaimByAbort = - (getCurrentTimeMs() - startTimeMs) < maxArbitrationTimeMs_ / 2 && + (getCurrentTimeMs() - startTimeMs) > maxArbitrationTimeMs_ / 2 && (reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0)); if (!reclaimByAbort) { reclaimedBytes = reclaimUsedMemoryBySpill( diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index c30d6fab22e6..e25bf562803d 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -1677,6 +1677,87 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].sum, 1); } +DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { + const int64_t memoryCapacity = 512 << 20; + const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; + const int64_t maxArbitrationTimeMs = 2'000; + const int64_t abortTimeThresholdMs = maxArbitrationTimeMs / 2; + + setupMemory( + memoryCapacity, + 0, + memoryPoolInitCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + kMemoryReclaimThreadsHwMultiplier, + nullptr, + true, + maxArbitrationTimeMs); + + test::SharedArbitratorTestHelper arbitratorHelper(arbitrator_); + + for (uint64_t pauseTimeMs : + {abortTimeThresholdMs / 2, + (maxArbitrationTimeMs - abortTimeThresholdMs) / 2}) { + auto task1 = addTask(memoryCapacity); + auto* op1 = task1->addMemoryOp(false); + op1->allocate(memoryCapacity / 2); + + auto task2 = addTask(memoryCapacity / 2); + auto* op2 = task2->addMemoryOp(false); + op2->allocate(memoryCapacity / 2); + + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::SharedArbitrator::runGlobalArbitration", + std::function( + ([&](const SharedArbitrator* /*unused*/) { + std::this_thread::sleep_for( + std::chrono::milliseconds(pauseTimeMs)); + }))); + + std::unordered_map runtimeStats; + auto statsWriter = std::make_unique(runtimeStats); + setThreadLocalRunTimeStatWriter(statsWriter.get()); + const auto prevGlobalArbitrationRuns = + arbitratorHelper.globalArbitrationRuns(); + op1->allocate(memoryCapacity / 2); + + ASSERT_EQ( + runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1); + ASSERT_GT( + runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0); + ASSERT_EQ( + runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1); + ASSERT_EQ( + runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1); + ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0); + ASSERT_TRUE(task1->error() == nullptr); + ASSERT_EQ(task1->capacity(), memoryCapacity); + ASSERT_TRUE(task2->error() != nullptr); + VELOX_ASSERT_THROW( + std::rethrow_exception(task2->error()), + "Memory pool aborted to reclaim used memory"); + + const auto deltaGlobalArbitrationRuns = + arbitratorHelper.globalArbitrationRuns() - prevGlobalArbitrationRuns; + if (pauseTimeMs < abortTimeThresholdMs) { + ASSERT_GT(deltaGlobalArbitrationRuns, 2); + } else { + // In SharedArbitrator::runGlobalArbitration() + // First loop attempting spill, global run update. + // Second loop abort, resume waiter. Global run update and the assert + // below is a race condition, hence ASSERT_LE + ASSERT_LE(deltaGlobalArbitrationRuns, 2); + } + } +} + DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) { const int64_t memoryCapacity = 512 << 20; const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; diff --git a/velox/common/memory/tests/SharedArbitratorTestUtil.h b/velox/common/memory/tests/SharedArbitratorTestUtil.h index 041f7ce9859e..9ab095e50e21 100644 --- a/velox/common/memory/tests/SharedArbitratorTestUtil.h +++ b/velox/common/memory/tests/SharedArbitratorTestUtil.h @@ -65,6 +65,10 @@ class SharedArbitratorTestHelper { return arbitrator_->globalArbitrationController_.get(); } + uint64_t globalArbitrationRuns() const { + return arbitrator_->globalArbitrationRuns_; + } + bool hasShutdown() const { std::lock_guard l(arbitrator_->stateLock_); return arbitrator_->hasShutdownLocked();