Skip to content

Commit

Permalink
Add arbitration lock time out to shared arbitrator
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 11, 2024
1 parent afa6572 commit b2d3ef5
Show file tree
Hide file tree
Showing 17 changed files with 455 additions and 213 deletions.
44 changes: 35 additions & 9 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ uint64_t ArbitrationParticipant::reclaim(
if (targetBytes == 0) {
return 0;
}
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
TestValue::adjust(
"facebook::velox::memory::ArbitrationParticipant::reclaim", this);
uint64_t reclaimedBytes{0};
Expand Down Expand Up @@ -320,7 +320,7 @@ uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) {

uint64_t ArbitrationParticipant::abort(
const std::exception_ptr& error) noexcept {
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
return abortLocked(error);
}

Expand Down Expand Up @@ -353,13 +353,6 @@ uint64_t ArbitrationParticipant::abortLocked(
return shrinkLocked(/*reclaimAll=*/true);
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
uint64_t maxWaitTimeNs) const {
std::unique_lock<std::timed_mutex> l(
reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs));
return l.owns_lock();
}

bool ArbitrationParticipant::hasRunningOp() const {
std::lock_guard<std::mutex> l(stateLock_);
return runningOp_ != nullptr;
Expand Down Expand Up @@ -408,4 +401,37 @@ std::string ArbitrationCandidate::toString() const {
succinctBytes(reclaimableUsedCapacity),
succinctBytes(reclaimableFreeCapacity));
}

ArbitrationOperationTimedLock::ArbitrationOperationTimedLock(
std::timed_mutex& mutex)
: mutex_(mutex) {
auto arbitrationContext = memoryArbitrationContext();
if (arbitrationContext == nullptr) {
mutex_.lock();
return;
}
auto* operation = arbitrationContext->op;
if (operation == nullptr) {
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kGlobal));
mutex_.lock();
return;
}
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kLocal));
if (!mutex_.try_lock_for(std::chrono::nanoseconds(operation->timeoutNs()))) {
VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format(
"Memory arbitration lock timed out on memory pool: {} after running {}",
operation->participant()->name(),
succinctNanos(operation->executionTimeNs())));
}
}

ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() {
mutex_.unlock();
}
} // namespace facebook::velox::memory
37 changes: 28 additions & 9 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,37 @@
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {

#define VELOX_MEM_ARBITRATION_TIMEOUT(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kMemArbitrationTimeout.c_str(), \
/* isRetriable */ true, \
"{}", \
errorMessage);

namespace test {
class ArbitrationParticipantTestHelper;
}

class ArbitrationOperation;
class ScopedArbitrationParticipant;

/// Custom lock that keeps track of the time of the ongoing arbitration
/// operation while waiting for the lock. The lock will identify if it needs to
/// apply a wait timeout by checking arbitrationCtx thread local variable. If a
/// local arbitration is ongoing on the current locking thread, timeout will
/// automatically be applied.
class ArbitrationOperationTimedLock {
public:
explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex);
~ArbitrationOperationTimedLock();

private:
std::timed_mutex& mutex_;
};

/// Manages the memory arbitration operations on a query memory pool. It also
/// tracks the arbitration stats during the query memory pool's lifecycle.
class ArbitrationParticipant
Expand Down Expand Up @@ -154,9 +178,9 @@ class ArbitrationParticipant
/// which ensures the liveness of underlying query memory pool. If the query
/// memory pool is being destroyed, then this function returns std::nullopt.
///
// NOTE: it is not safe to directly access arbitration participant as it only
// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
// arbitration participant for access.
/// NOTE: it is not safe to directly access arbitration participant as it only
/// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
/// arbitration participant for access.
std::optional<ScopedArbitrationParticipant> lock();

/// Returns the corresponding query memory pool.
Expand Down Expand Up @@ -223,11 +247,6 @@ class ArbitrationParticipant
return aborted_;
}

/// Invoked to wait for the pending memory reclaim or abort operation to
/// complete within a 'maxWaitTimeMs' time window. The function returns false
/// if the wait has timed out.
bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const;

/// Invoked to start arbitration operation 'op'. The operation needs to wait
/// for the prior arbitration operations to finish first before executing to
/// ensure the serialized execution of arbitration operations from the same
Expand Down Expand Up @@ -333,7 +352,7 @@ class ArbitrationParticipant
tsan_atomic<uint64_t> reclaimedBytes_{0};
tsan_atomic<uint64_t> growBytes_{0};

mutable std::timed_mutex reclaimLock_;
mutable std::timed_mutex reclaimMutex_;

friend class ScopedArbitrationParticipant;
friend class test::ArbitrationParticipantTestHelper;
Expand Down
1 change: 0 additions & 1 deletion velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ std::shared_ptr<MemoryPool> deprecatedAddDefaultLeafMemoryPool(
/// using this method can get a pool that is shared with other threads. The goal
/// is to minimize lock contention while supporting such use cases.
///
///
/// TODO: deprecate this API after all the use cases are able to manage the
/// lifecycle of the allocated memory pools properly.
MemoryPool& deprecatedSharedLeafPool();
Expand Down
14 changes: 10 additions & 4 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,12 @@ bool MemoryArbitrator::Stats::operator<=(const Stats& other) const {
return !(*this > other);
}

MemoryArbitrationContext::MemoryArbitrationContext(const MemoryPool* requestor)
: type(Type::kLocal), requestorName(requestor->name()) {}
MemoryArbitrationContext::MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op)
: type(Type::kLocal), requestorName(requestor->name()), op(_op) {
VELOX_CHECK_NOT_NULL(op);
}

std::string MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type type) {
Expand All @@ -465,8 +469,10 @@ std::string MemoryArbitrationContext::typeName(
}

ScopedMemoryArbitrationContext::ScopedMemoryArbitrationContext(
const MemoryPool* requestor)
: savedArbitrationCtx_(arbitrationCtx), currentArbitrationCtx_(requestor) {
const MemoryPool* requestor,
ArbitrationOperation* op)
: savedArbitrationCtx_(arbitrationCtx),
currentArbitrationCtx_(requestor, op) {
arbitrationCtx = &currentArbitrationCtx_;
}

Expand Down
25 changes: 17 additions & 8 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
namespace facebook::velox::memory {

class MemoryPool;
class ArbitrationOperation;

using MemoryArbitrationStateCheckCB = std::function<void(MemoryPool&)>;

Expand Down Expand Up @@ -398,11 +399,11 @@ class NonReclaimableSectionGuard {
const bool oldNonReclaimableSectionValue_;
};

/// The memory arbitration context which is set on per-thread local variable by
/// memory arbitrator. It is used to indicate a running thread is under memory
/// arbitration processing or not. This helps to enable sanity check such as all
/// the memory reservations during memory arbitration should come from the
/// spilling memory pool.
/// The memory arbitration context which is set as per-thread local variable by
/// memory arbitrator. It is used to indicate if a running thread is under
/// memory arbitration. This helps to enable sanity check such as all the memory
/// reservations during memory arbitration should come from the spilling memory
/// pool.
struct MemoryArbitrationContext {
/// Defines the type of memory arbitration.
enum class Type {
Expand All @@ -420,20 +421,28 @@ struct MemoryArbitrationContext {
/// global memory arbitration type.
const std::string requestorName;

explicit MemoryArbitrationContext(const MemoryPool* requestor);
ArbitrationOperation* const op;

MemoryArbitrationContext() : type(Type::kGlobal) {}
MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op);

MemoryArbitrationContext() : type(Type::kGlobal), op(nullptr) {}
};

/// Object used to set/restore the memory arbitration context when a thread is
/// under memory arbitration processing.
class ScopedMemoryArbitrationContext {
public:
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);
ScopedMemoryArbitrationContext();

explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* context);

ScopedMemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* op);

~ScopedMemoryArbitrationContext();

private:
Expand Down
56 changes: 19 additions & 37 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ T getConfig(
}
return defaultValue;
}

#define VELOX_MEM_ARBITRATION_TIMEOUT(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kMemArbitrationTimeout.c_str(), \
/* isRetriable */ true, \
"{}", \
errorMessage);
} // namespace

int64_t SharedArbitrator::ExtraConfig::reservedCapacity(
Expand Down Expand Up @@ -284,7 +275,7 @@ SharedArbitrator::SharedArbitrator(const Config& config)

void SharedArbitrator::shutdown() {
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
if (hasShutdownLocked()) {
return;
Expand Down Expand Up @@ -436,7 +427,7 @@ void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {
auto scopedParticipant = newParticipant->lock().value();
std::vector<ContinuePromise> arbitrationWaiters;
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
const uint64_t minBytesToReserve = std::min(
scopedParticipant->maxCapacity(), scopedParticipant->minCapacity());
const uint64_t maxBytesToReserve = std::max(
Expand Down Expand Up @@ -589,7 +580,7 @@ uint64_t SharedArbitrator::allocateCapacity(
uint64_t requestBytes,
uint64_t maxAllocateBytes,
uint64_t minAllocateBytes) {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return allocateCapacityLocked(
participantId, requestBytes, maxAllocateBytes, minAllocateBytes);
}
Expand Down Expand Up @@ -745,9 +736,10 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
participantConfig_.minReclaimBytes) {
return false;
}

// NOTE: if global memory arbitration is not enabled, we will try to
// reclaim from the participant itself before failing this operation.
// After failing to acquire enough free capacity to fulfil this capacity
// growth request, we will try to reclaim from the participant itself before
// failing this operation. We only do this if global memory arbitration is
// not enabled.
reclaim(
op.participant(),
op.requestBytes(),
Expand All @@ -768,7 +760,7 @@ bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) {
ContinueFuture arbitrationWaitFuture{ContinueFuture::makeEmpty()};
uint64_t allocatedBytes{0};
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
allocatedBytes = allocateCapacityLocked(
op.participant()->id(),
op.requestBytes(),
Expand Down Expand Up @@ -838,7 +830,7 @@ void SharedArbitrator::globalArbitrationMain() {
VELOX_MEM_LOG(INFO) << "Global arbitration controller started";
while (true) {
{
std::unique_lock l(stateLock_);
std::unique_lock<std::mutex> l(stateMutex_);
globalArbitrationThreadCv_.wait(l, [&] {
return hasShutdownLocked() || !globalArbitrationWaiters_.empty();
});
Expand Down Expand Up @@ -918,7 +910,7 @@ void SharedArbitrator::runGlobalArbitration() {

uint64_t SharedArbitrator::getGlobalArbitrationTarget() {
uint64_t targetBytes{0};
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
for (const auto& waiter : globalArbitrationWaiters_) {
targetBytes += waiter.second->op->maxGrowBytes();
}
Expand All @@ -929,14 +921,6 @@ uint64_t SharedArbitrator::getGlobalArbitrationTarget() {
capacity_ * globalArbitrationMemoryReclaimPct_ / 100, targetBytes);
}

void SharedArbitrator::getGrowTargets(
ArbitrationOperation& op,
uint64_t& maxGrowTarget,
uint64_t& minGrowTarget) {
op.participant()->getGrowTargets(
op.requestBytes(), maxGrowTarget, minGrowTarget);
}

void SharedArbitrator::checkIfAborted(ArbitrationOperation& op) {
if (op.participant()->aborted()) {
VELOX_MEM_POOL_ABORTED(
Expand Down Expand Up @@ -1141,9 +1125,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryBySpill(
reclaimedBytes += reclaimResult->reclaimedBytes;
}
VELOX_CHECK_LE(prevReclaimedBytes, reclaimedUsedBytes_);
// NOTE: there might be concurrent local spill or spill triggered by
// external shrink.
return std::max(reclaimedBytes, reclaimedUsedBytes_ - prevReclaimedBytes);
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryByAbort(bool force) {
Expand Down Expand Up @@ -1254,12 +1236,12 @@ void SharedArbitrator::freeCapacity(uint64_t bytes) {
if (FOLLY_UNLIKELY(bytes == 0)) {
return;
}
std::vector<ContinuePromise> resumes;
std::vector<ContinuePromise> globalArbitrationWaitResumes;
{
std::lock_guard<std::mutex> l(stateLock_);
freeCapacityLocked(bytes, resumes);
std::lock_guard<std::mutex> l(stateMutex_);
freeCapacityLocked(bytes, globalArbitrationWaitResumes);
}
for (auto& resume : resumes) {
for (auto& resume : globalArbitrationWaitResumes) {
resume.setValue();
}
}
Expand Down Expand Up @@ -1304,7 +1286,7 @@ void SharedArbitrator::resumeGlobalArbitrationWaitersLocked(
void SharedArbitrator::removeGlobalArbitrationWaiter(uint64_t id) {
ContinuePromise resume = ContinuePromise::makeEmpty();
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
auto it = globalArbitrationWaiters_.find(id);
if (it != globalArbitrationWaiters_.end()) {
VELOX_CHECK_EQ(it->second->allocatedBytes, 0);
Expand All @@ -1326,7 +1308,7 @@ void SharedArbitrator::freeReservedCapacityLocked(uint64_t& bytes) {
}

MemoryArbitrator::Stats SharedArbitrator::stats() const {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return statsLocked();
}

Expand All @@ -1346,7 +1328,7 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const {
}

std::string SharedArbitrator::toString() const {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return fmt::format(
"ARBITRATOR[{} CAPACITY[{}] {}]",
kind_,
Expand All @@ -1359,7 +1341,7 @@ SharedArbitrator::ScopedArbitration::ScopedArbitration(
ArbitrationOperation* operation)
: arbitrator_(arbitrator),
operation_(operation),
arbitrationCtx_(operation->participant()->pool()),
arbitrationCtx_(operation->participant()->pool(), operation),
startTime_(std::chrono::steady_clock::now()) {
VELOX_CHECK_NOT_NULL(arbitrator_);
VELOX_CHECK_NOT_NULL(operation_);
Expand Down
Loading

0 comments on commit b2d3ef5

Please sign in to comment.