Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arbitration lock time out to shared arbitrator #11376

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ uint64_t ArbitrationParticipant::reclaim(
if (targetBytes == 0) {
return 0;
}
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
TestValue::adjust(
"facebook::velox::memory::ArbitrationParticipant::reclaim", this);
uint64_t reclaimedBytes{0};
Expand Down Expand Up @@ -320,7 +320,7 @@ uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) {

uint64_t ArbitrationParticipant::abort(
const std::exception_ptr& error) noexcept {
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
return abortLocked(error);
}

Expand Down Expand Up @@ -353,13 +353,6 @@ uint64_t ArbitrationParticipant::abortLocked(
return shrinkLocked(/*reclaimAll=*/true);
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
uint64_t maxWaitTimeNs) const {
std::unique_lock<std::timed_mutex> l(
reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs));
return l.owns_lock();
}

bool ArbitrationParticipant::hasRunningOp() const {
std::lock_guard<std::mutex> l(stateLock_);
return runningOp_ != nullptr;
Expand Down Expand Up @@ -408,4 +401,37 @@ std::string ArbitrationCandidate::toString() const {
succinctBytes(reclaimableUsedCapacity),
succinctBytes(reclaimableFreeCapacity));
}

ArbitrationOperationTimedLock::ArbitrationOperationTimedLock(
std::timed_mutex& mutex)
: mutex_(mutex) {
auto arbitrationContext = memoryArbitrationContext();
if (arbitrationContext == nullptr) {
mutex_.lock();
return;
}
auto* operation = arbitrationContext->op;
if (operation == nullptr) {
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kGlobal));
mutex_.lock();
return;
}
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kLocal));
if (!mutex_.try_lock_for(std::chrono::nanoseconds(operation->timeoutNs()))) {
VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format(
"Memory arbitration lock timed out on memory pool: {} after running {}",
operation->participant()->name(),
succinctNanos(operation->executionTimeNs())));
}
}

ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() {
mutex_.unlock();
}
} // namespace facebook::velox::memory
37 changes: 28 additions & 9 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,37 @@
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {

#define VELOX_MEM_ARBITRATION_TIMEOUT(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kMemArbitrationTimeout.c_str(), \
/* isRetriable */ true, \
"{}", \
errorMessage);

namespace test {
class ArbitrationParticipantTestHelper;
}

class ArbitrationOperation;
class ScopedArbitrationParticipant;

/// Custom lock that keeps track of the time of the ongoing arbitration
/// operation while waiting for the lock. The lock will identify if it needs to
/// apply a wait timeout by checking arbitrationCtx thread local variable. If a
/// local arbitration is ongoing on the current locking thread, timeout will
/// automatically be applied.
class ArbitrationOperationTimedLock {
tanjialiang marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex);
~ArbitrationOperationTimedLock();

private:
std::timed_mutex& mutex_;
};

/// Manages the memory arbitration operations on a query memory pool. It also
/// tracks the arbitration stats during the query memory pool's lifecycle.
class ArbitrationParticipant
Expand Down Expand Up @@ -154,9 +178,9 @@ class ArbitrationParticipant
/// which ensures the liveness of underlying query memory pool. If the query
/// memory pool is being destroyed, then this function returns std::nullopt.
///
// NOTE: it is not safe to directly access arbitration participant as it only
// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
// arbitration participant for access.
/// NOTE: it is not safe to directly access arbitration participant as it only
/// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
/// arbitration participant for access.
std::optional<ScopedArbitrationParticipant> lock();

/// Returns the corresponding query memory pool.
Expand Down Expand Up @@ -223,11 +247,6 @@ class ArbitrationParticipant
return aborted_;
}

/// Invoked to wait for the pending memory reclaim or abort operation to
/// complete within a 'maxWaitTimeMs' time window. The function returns false
/// if the wait has timed out.
bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const;

/// Invoked to start arbitration operation 'op'. The operation needs to wait
/// for the prior arbitration operations to finish first before executing to
/// ensure the serialized execution of arbitration operations from the same
Expand Down Expand Up @@ -333,7 +352,7 @@ class ArbitrationParticipant
tsan_atomic<uint64_t> reclaimedBytes_{0};
tsan_atomic<uint64_t> growBytes_{0};

mutable std::timed_mutex reclaimLock_;
mutable std::timed_mutex reclaimMutex_;

friend class ScopedArbitrationParticipant;
friend class test::ArbitrationParticipantTestHelper;
Expand Down
1 change: 0 additions & 1 deletion velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ std::shared_ptr<MemoryPool> deprecatedAddDefaultLeafMemoryPool(
/// using this method can get a pool that is shared with other threads. The goal
/// is to minimize lock contention while supporting such use cases.
///
///
/// TODO: deprecate this API after all the use cases are able to manage the
/// lifecycle of the allocated memory pools properly.
MemoryPool& deprecatedSharedLeafPool();
Expand Down
14 changes: 10 additions & 4 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,12 @@ bool MemoryArbitrator::Stats::operator<=(const Stats& other) const {
return !(*this > other);
}

MemoryArbitrationContext::MemoryArbitrationContext(const MemoryPool* requestor)
: type(Type::kLocal), requestorName(requestor->name()) {}
MemoryArbitrationContext::MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op)
: type(Type::kLocal), requestorName(requestor->name()), op(_op) {
VELOX_CHECK_NOT_NULL(op);
}

std::string MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type type) {
Expand All @@ -465,8 +469,10 @@ std::string MemoryArbitrationContext::typeName(
}

ScopedMemoryArbitrationContext::ScopedMemoryArbitrationContext(
const MemoryPool* requestor)
: savedArbitrationCtx_(arbitrationCtx), currentArbitrationCtx_(requestor) {
const MemoryPool* requestor,
ArbitrationOperation* op)
: savedArbitrationCtx_(arbitrationCtx),
currentArbitrationCtx_(requestor, op) {
arbitrationCtx = &currentArbitrationCtx_;
}

Expand Down
25 changes: 17 additions & 8 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
namespace facebook::velox::memory {

class MemoryPool;
class ArbitrationOperation;

using MemoryArbitrationStateCheckCB = std::function<void(MemoryPool&)>;

Expand Down Expand Up @@ -398,11 +399,11 @@ class NonReclaimableSectionGuard {
const bool oldNonReclaimableSectionValue_;
};

/// The memory arbitration context which is set on per-thread local variable by
/// memory arbitrator. It is used to indicate a running thread is under memory
/// arbitration processing or not. This helps to enable sanity check such as all
/// the memory reservations during memory arbitration should come from the
/// spilling memory pool.
/// The memory arbitration context which is set as per-thread local variable by
/// memory arbitrator. It is used to indicate if a running thread is under
/// memory arbitration. This helps to enable sanity check such as all the memory
/// reservations during memory arbitration should come from the spilling memory
/// pool.
struct MemoryArbitrationContext {
/// Defines the type of memory arbitration.
enum class Type {
Expand All @@ -420,20 +421,28 @@ struct MemoryArbitrationContext {
/// global memory arbitration type.
const std::string requestorName;

explicit MemoryArbitrationContext(const MemoryPool* requestor);
ArbitrationOperation* const op;

MemoryArbitrationContext() : type(Type::kGlobal) {}
MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op);

MemoryArbitrationContext() : type(Type::kGlobal), op(nullptr) {}
};

/// Object used to set/restore the memory arbitration context when a thread is
/// under memory arbitration processing.
class ScopedMemoryArbitrationContext {
public:
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);
ScopedMemoryArbitrationContext();

explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* context);

ScopedMemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* op);

~ScopedMemoryArbitrationContext();

private:
Expand Down
56 changes: 19 additions & 37 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ T getConfig(
}
return defaultValue;
}

#define VELOX_MEM_ARBITRATION_TIMEOUT(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kMemArbitrationTimeout.c_str(), \
/* isRetriable */ true, \
"{}", \
errorMessage);
} // namespace

int64_t SharedArbitrator::ExtraConfig::reservedCapacity(
Expand Down Expand Up @@ -284,7 +275,7 @@ SharedArbitrator::SharedArbitrator(const Config& config)

void SharedArbitrator::shutdown() {
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
if (hasShutdownLocked()) {
return;
Expand Down Expand Up @@ -436,7 +427,7 @@ void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {
auto scopedParticipant = newParticipant->lock().value();
std::vector<ContinuePromise> arbitrationWaiters;
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
const uint64_t minBytesToReserve = std::min(
scopedParticipant->maxCapacity(), scopedParticipant->minCapacity());
const uint64_t maxBytesToReserve = std::max(
Expand Down Expand Up @@ -589,7 +580,7 @@ uint64_t SharedArbitrator::allocateCapacity(
uint64_t requestBytes,
uint64_t maxAllocateBytes,
uint64_t minAllocateBytes) {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return allocateCapacityLocked(
participantId, requestBytes, maxAllocateBytes, minAllocateBytes);
}
Expand Down Expand Up @@ -745,9 +736,10 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
participantConfig_.minReclaimBytes) {
return false;
}

// NOTE: if global memory arbitration is not enabled, we will try to
// reclaim from the participant itself before failing this operation.
// After failing to acquire enough free capacity to fulfil this capacity
// growth request, we will try to reclaim from the participant itself before
// failing this operation. We only do this if global memory arbitration is
// not enabled.
reclaim(
op.participant(),
op.requestBytes(),
Expand All @@ -768,7 +760,7 @@ bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) {
ContinueFuture arbitrationWaitFuture{ContinueFuture::makeEmpty()};
uint64_t allocatedBytes{0};
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
allocatedBytes = allocateCapacityLocked(
op.participant()->id(),
op.requestBytes(),
Expand Down Expand Up @@ -838,7 +830,7 @@ void SharedArbitrator::globalArbitrationMain() {
VELOX_MEM_LOG(INFO) << "Global arbitration controller started";
while (true) {
{
std::unique_lock l(stateLock_);
std::unique_lock<std::mutex> l(stateMutex_);
globalArbitrationThreadCv_.wait(l, [&] {
return hasShutdownLocked() || !globalArbitrationWaiters_.empty();
});
Expand Down Expand Up @@ -918,7 +910,7 @@ void SharedArbitrator::runGlobalArbitration() {

uint64_t SharedArbitrator::getGlobalArbitrationTarget() {
uint64_t targetBytes{0};
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
for (const auto& waiter : globalArbitrationWaiters_) {
targetBytes += waiter.second->op->maxGrowBytes();
}
Expand All @@ -929,14 +921,6 @@ uint64_t SharedArbitrator::getGlobalArbitrationTarget() {
capacity_ * globalArbitrationMemoryReclaimPct_ / 100, targetBytes);
}

void SharedArbitrator::getGrowTargets(
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
ArbitrationOperation& op,
uint64_t& maxGrowTarget,
uint64_t& minGrowTarget) {
op.participant()->getGrowTargets(
op.requestBytes(), maxGrowTarget, minGrowTarget);
}

void SharedArbitrator::checkIfAborted(ArbitrationOperation& op) {
if (op.participant()->aborted()) {
VELOX_MEM_POOL_ABORTED(
Expand Down Expand Up @@ -1141,9 +1125,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryBySpill(
reclaimedBytes += reclaimResult->reclaimedBytes;
}
VELOX_CHECK_LE(prevReclaimedBytes, reclaimedUsedBytes_);
// NOTE: there might be concurrent local spill or spill triggered by
tanjialiang marked this conversation as resolved.
Show resolved Hide resolved
// external shrink.
return std::max(reclaimedBytes, reclaimedUsedBytes_ - prevReclaimedBytes);
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryByAbort(bool force) {
Expand Down Expand Up @@ -1254,12 +1236,12 @@ void SharedArbitrator::freeCapacity(uint64_t bytes) {
if (FOLLY_UNLIKELY(bytes == 0)) {
return;
}
std::vector<ContinuePromise> resumes;
std::vector<ContinuePromise> globalArbitrationWaitResumes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use short names which is clear in the context? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In freeCapacity() method scope there isn't any context mentioning the resume is for global arbitration. So I changed the name to indicate. This way I don't need to refer to other locations of the file to figure out this resume is for global arbitration anymore.

{
std::lock_guard<std::mutex> l(stateLock_);
freeCapacityLocked(bytes, resumes);
std::lock_guard<std::mutex> l(stateMutex_);
freeCapacityLocked(bytes, globalArbitrationWaitResumes);
}
for (auto& resume : resumes) {
for (auto& resume : globalArbitrationWaitResumes) {
resume.setValue();
}
}
Expand Down Expand Up @@ -1304,7 +1286,7 @@ void SharedArbitrator::resumeGlobalArbitrationWaitersLocked(
void SharedArbitrator::removeGlobalArbitrationWaiter(uint64_t id) {
ContinuePromise resume = ContinuePromise::makeEmpty();
{
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
auto it = globalArbitrationWaiters_.find(id);
if (it != globalArbitrationWaiters_.end()) {
VELOX_CHECK_EQ(it->second->allocatedBytes, 0);
Expand All @@ -1326,7 +1308,7 @@ void SharedArbitrator::freeReservedCapacityLocked(uint64_t& bytes) {
}

MemoryArbitrator::Stats SharedArbitrator::stats() const {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return statsLocked();
}

Expand All @@ -1346,7 +1328,7 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const {
}

std::string SharedArbitrator::toString() const {
std::lock_guard<std::mutex> l(stateLock_);
std::lock_guard<std::mutex> l(stateMutex_);
return fmt::format(
"ARBITRATOR[{} CAPACITY[{}] {}]",
kind_,
Expand All @@ -1359,7 +1341,7 @@ SharedArbitrator::ScopedArbitration::ScopedArbitration(
ArbitrationOperation* operation)
: arbitrator_(arbitrator),
operation_(operation),
arbitrationCtx_(operation->participant()->pool()),
arbitrationCtx_(operation->participant()->pool(), operation),
startTime_(std::chrono::steady_clock::now()) {
VELOX_CHECK_NOT_NULL(arbitrator_);
VELOX_CHECK_NOT_NULL(operation_);
Expand Down
Loading
Loading