diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index f605d17f9ee5..f5620a96622e 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -325,7 +325,7 @@ void SharedArbitrator::shutdownGlobalArbitration() { } void SharedArbitrator::wakeupGlobalArbitrationThread() { - VELOX_CHECK(globalArbitrationEnabled_); + checkGlobalArbitrationEnabled(); VELOX_CHECK_NOT_NULL(globalArbitrationController_); incrementGlobalArbitrationWaitCount(); globalArbitrationThreadCv_.notify_one(); @@ -721,9 +721,12 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) { reclaimUnusedCapacity(); RETURN_IF_TRUE(growWithFreeCapacity(op)); - if (!globalArbitrationEnabled_ && - op.participant()->reclaimableUsedCapacity() >= - participantConfig_.minReclaimBytes) { + if (!globalArbitrationEnabled_) { + if (op.participant()->reclaimableUsedCapacity() < + participantConfig_.minReclaimBytes) { + return false; + } + // NOTE: if global memory arbitration is not enabled, we will try to // reclaim from the participant itself before failing this operation. reclaim( @@ -739,7 +742,7 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) { } bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) { - VELOX_CHECK(globalArbitrationEnabled_); + checkGlobalArbitrationEnabled(); checkIfTimeout(op); std::unique_ptr arbitrationWait; diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 54758e7f19c6..7d43e2b6eb1e 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -304,6 +304,10 @@ class SharedArbitrator : public memory::MemoryArbitrator { return state_ == State::kShutdown; } + FOLLY_ALWAYS_INLINE void checkGlobalArbitrationEnabled() const { + VELOX_CHECK(globalArbitrationEnabled_, "Global arbitration is not enabled"); + } + // Invoked to get the arbitration participant by 'name'. The function returns // std::nullopt if the underlying query memory pool is destroyed. std::optional getParticipant( diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 6066817188f9..67eb1ffa4d91 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -3708,6 +3708,64 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailure) { } } +// This test is to verify if a non-reclaimable query fails properly if global +// arbitration is disabled. +TEST_F( + MockSharedArbitrationTest, + arbitrationFailureOnNonReclaimableQueryWithGlobalArbitrationDisabled) { + const int64_t memoryCapacity = 128 * MB; + for (bool hasMinReclaimBytes : {false, true}) { + SCOPED_TRACE(fmt::format("hasMinReclaimBytes {}", hasMinReclaimBytes)); + // Set min reclaim bytes to avoid reclaim from itself before fail the + // arbitration. + setupMemory( + memoryCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + hasMinReclaimBytes ? MB : 0, + 0, + 0, + 1.0, + nullptr, + false); + std::shared_ptr task1 = addTask(); + MockMemoryOperator* op1 = task1->addMemoryOp(false); + op1->allocate(memoryCapacity / 4 * 3); + ASSERT_EQ(task1->capacity(), memoryCapacity / 4 * 3); + + std::shared_ptr task2 = addTask(); + MockMemoryOperator* op2 = task2->addMemoryOp(false); + VELOX_ASSERT_THROW( + op2->allocate(memoryCapacity / 2), "Exceeded memory pool capacity "); + } +} + +// This test is to verify if a reclaimable query reclaim from itself before +// reaching the capacity limit if global arbitration is disabled. +TEST_F( + MockSharedArbitrationTest, + reclaimBeforeReachCapacityLimitWhenGlobalArbitrationDisabled) { + const int64_t memoryCapacity = 128 * MB; + setupMemory( + memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, false); + std::shared_ptr task1 = addTask(); + MockMemoryOperator* op1 = task1->addMemoryOp(true); + op1->allocate(memoryCapacity / 2); + ASSERT_EQ(task1->capacity(), memoryCapacity / 2); + + std::shared_ptr task2 = addTask(); + MockMemoryOperator* op2 = task2->addMemoryOp(true); + op2->allocate(memoryCapacity / 2); + ASSERT_EQ(task2->capacity(), memoryCapacity / 2); + + op2->allocate(memoryCapacity / 4); +} + TEST_F(MockSharedArbitrationTest, concurrentArbitrations) { const int numTasks = 10; const int numOpsPerTask = 5;