Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(arbitrator): Simplify arbitration participant lock #11525

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 14 additions & 37 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
@@ -267,7 +267,7 @@ uint64_t ArbitrationParticipant::reclaim(
if (targetBytes == 0) {
return 0;
}
ArbitrationOperationTimedLock l(reclaimMutex_);
ArbitrationTimedLock l(reclaimMutex_, maxWaitTimeNs);
TestValue::adjust(
"facebook::velox::memory::ArbitrationParticipant::reclaim", this);
uint64_t reclaimedBytes{0};
@@ -320,7 +320,7 @@ uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) {

uint64_t ArbitrationParticipant::abort(
const std::exception_ptr& error) noexcept {
ArbitrationOperationTimedLock l(reclaimMutex_);
std::lock_guard<std::timed_mutex> l(reclaimMutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend to take time for abort? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort path is used by global arbitration. We currently don't have timeout information to pass for global arbitration because it does not belong to any requestor. The timeout protection is performed already when waiting for global arbitration.

return abortLocked(error);
}

@@ -403,52 +403,29 @@ std::string ArbitrationCandidate::toString() const {
}

#ifdef TSAN_BUILD
ArbitrationOperationTimedLock::ArbitrationOperationTimedLock(
std::timed_mutex& mutex)
ArbitrationTimedLock::ArbitrationTimedLock(
std::timed_mutex& mutex,
uint64_t /* unused */)
: mutex_(mutex) {
mutex_.lock();
}

ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() {
ArbitrationTimedLock::~ArbitrationTimedLock() {
mutex_.unlock();
}
#else
ArbitrationOperationTimedLock::ArbitrationOperationTimedLock(
std::timed_mutex& mutex) {
auto arbitrationContext = memoryArbitrationContext();
if (arbitrationContext == nullptr) {
std::unique_lock<std::timed_mutex> l(mutex);
timedLock_ = std::move(l);
return;
}
auto* operation = arbitrationContext->op;
if (operation == nullptr) {
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kGlobal));
std::unique_lock<std::timed_mutex> l(mutex);
timedLock_ = std::move(l);
return;
}
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kLocal));
std::unique_lock<std::timed_mutex> l(
mutex, std::chrono::nanoseconds(operation->timeoutNs()));
timedLock_ = std::move(l);
if (!timedLock_.owns_lock()) {
ArbitrationTimedLock::ArbitrationTimedLock(
std::timed_mutex& mutex,
uint64_t timeoutNs)
: mutex_(mutex) {
if (!mutex_.try_lock_for(std::chrono::nanoseconds(timeoutNs))) {
VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format(
"Memory arbitration lock timed out on memory pool: {} after running {}",
operation->participant()->name(),
succinctNanos(operation->executionTimeNs())));
"Memory arbitration lock timed out when reclaiming from arbitration participant."));
}
}

ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() {
VELOX_CHECK(timedLock_.owns_lock());
timedLock_.unlock();
ArbitrationTimedLock::~ArbitrationTimedLock() {
mutex_.unlock();
}
#endif
} // namespace facebook::velox::memory
18 changes: 4 additions & 14 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
@@ -49,26 +49,16 @@ class ScopedArbitrationParticipant;
/// automatically be applied.
///
/// NOTE: TSAN is incompatible with std::timed_mutex when used with timeout. So
/// in TSAN build a trivial implementation is implemented.
#ifdef TSAN_BUILD
class ArbitrationOperationTimedLock {
/// in TSAN build a trivial lock is implemented.
class ArbitrationTimedLock {
public:
explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex);
~ArbitrationOperationTimedLock();
ArbitrationTimedLock(std::timed_mutex& mutex, uint64_t timeoutNs);
~ArbitrationTimedLock();

private:
std::timed_mutex& mutex_;
};
#else
class ArbitrationOperationTimedLock {
public:
explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex);
~ArbitrationOperationTimedLock();

private:
std::unique_lock<std::timed_mutex> timedLock_;
};
#endif
/// Manages the memory arbitration operations on a query memory pool. It also
/// tracks the arbitration stats during the query memory pool's lifecycle.
class ArbitrationParticipant
14 changes: 4 additions & 10 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
@@ -449,12 +449,8 @@ bool MemoryArbitrator::Stats::operator<=(const Stats& other) const {
return !(*this > other);
}

MemoryArbitrationContext::MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op)
: type(Type::kLocal), requestorName(requestor->name()), op(_op) {
VELOX_CHECK_NOT_NULL(op);
}
MemoryArbitrationContext::MemoryArbitrationContext(const MemoryPool* requestor)
: type(Type::kLocal), requestorName(requestor->name()) {}

std::string MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type type) {
@@ -469,10 +465,8 @@ std::string MemoryArbitrationContext::typeName(
}

ScopedMemoryArbitrationContext::ScopedMemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* op)
: savedArbitrationCtx_(arbitrationCtx),
currentArbitrationCtx_(requestor, op) {
const MemoryPool* requestor)
: savedArbitrationCtx_(arbitrationCtx), currentArbitrationCtx_(requestor) {
arbitrationCtx = &currentArbitrationCtx_;
}

12 changes: 3 additions & 9 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
@@ -421,13 +421,9 @@ struct MemoryArbitrationContext {
/// global memory arbitration type.
const std::string requestorName;

ArbitrationOperation* const op;
explicit MemoryArbitrationContext(const MemoryPool* requestor);

MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op);

MemoryArbitrationContext() : type(Type::kGlobal), op(nullptr) {}
MemoryArbitrationContext() : type(Type::kGlobal) {}
};

/// Object used to set/restore the memory arbitration context when a thread is
@@ -439,9 +435,7 @@ class ScopedMemoryArbitrationContext {
explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* context);

ScopedMemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* op);
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);

~ScopedMemoryArbitrationContext();

2 changes: 1 addition & 1 deletion velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
@@ -1341,7 +1341,7 @@ SharedArbitrator::ScopedArbitration::ScopedArbitration(
ArbitrationOperation* operation)
: arbitrator_(arbitrator),
operation_(operation),
arbitrationCtx_(operation->participant()->pool(), operation),
arbitrationCtx_(operation->participant()->pool()),
startTime_(std::chrono::steady_clock::now()) {
VELOX_CHECK_NOT_NULL(arbitrator_);
VELOX_CHECK_NOT_NULL(operation_);
66 changes: 16 additions & 50 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
@@ -1419,7 +1419,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, reclaimLock) {
folly::EventCount reclaim1CompletedWait;
std::thread reclaimThread1([&]() {
memory::MemoryReclaimer::Stats stats;
ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000, stats), 0);
ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000'000'000, stats), 0);
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
reclaim1CompletedFlag = true;
reclaim1CompletedWait.notifyAll();
@@ -1454,7 +1454,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, reclaimLock) {
folly::EventCount reclaim2CompletedWait;
std::thread reclaimThread2([&]() {
memory::MemoryReclaimer::Stats stats;
ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000, stats), 0);
ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000'000'000, stats), 0);
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
reclaim2CompletedFlag = true;
reclaim2CompletedWait.notifyAll();
@@ -1896,65 +1896,31 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationTimedLock) {
};

struct TestData {
std::string type;
uint64_t lockHoldTimeNs;
uint64_t opTimeoutNs;
};

std::timed_mutex mutex;
std::vector<TestData> testDataVec{
{"local", 1'000'000'000UL, 2'000'000'000UL},
{"local", 2'000'000'000UL, 1'000'000'000UL},
{"global", 1'000'000'000UL, 2'000'000'000UL},
{"global", 2'000'000'000UL, 1'000'000'000UL},
{"none", 1'000'000'000UL, 2'000'000'000UL}};
{1'000'000'000UL, 2'000'000'000UL}, {2'000'000'000UL, 1'000'000'000UL}};

for (auto& testData : testDataVec) {
ScopedArbitrationParticipant scopedArbitrationParticipant(
participant, participantPool);
ArbitrationOperation operation(
std::move(scopedArbitrationParticipant), 1024, testData.opTimeoutNs);
if (testData.type == "local") {
MemoryArbitrationContext ctx(participantPool.get(), &operation);
ScopedMemoryArbitrationContext scopedCtx(&ctx);

folly::EventCount lockWait;
std::atomic_bool lockWaitFlag{true};
auto lockHolder = createLockHolderThread(
mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag);
std::unique_ptr<ArbitrationOperationTimedLock> timedLock{nullptr};
lockWait.await([&]() { return !lockWaitFlag.load(); });
if (testData.lockHoldTimeNs < testData.opTimeoutNs) {
timedLock = std::make_unique<ArbitrationOperationTimedLock>(mutex);
ASSERT_FALSE(mutex.try_lock());
} else {
VELOX_ASSERT_THROW(
std::make_unique<ArbitrationOperationTimedLock>(mutex),
"Memory arbitration lock timed out");
}
lockHolder.join();
} else if (testData.type == "global") {
MemoryArbitrationContext ctx;
ScopedMemoryArbitrationContext scopedCtx(&ctx);

folly::EventCount lockWait;
std::atomic_bool lockWaitFlag{true};
auto lockHolder = createLockHolderThread(
mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag);
lockWait.await([&]() { return !lockWaitFlag.load(); });
ArbitrationOperationTimedLock timedLock(mutex);
folly::EventCount lockWait;
std::atomic_bool lockWaitFlag{true};
auto lockHolder = createLockHolderThread(
mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag);
std::unique_ptr<ArbitrationTimedLock> timedLock{nullptr};
lockWait.await([&]() { return !lockWaitFlag.load(); });
if (testData.lockHoldTimeNs < testData.opTimeoutNs) {
timedLock =
std::make_unique<ArbitrationTimedLock>(mutex, testData.opTimeoutNs);
ASSERT_FALSE(mutex.try_lock());
lockHolder.join();
} else {
folly::EventCount lockWait;
std::atomic_bool lockWaitFlag{true};
auto lockHolder = createLockHolderThread(
mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag);
lockWait.await([&]() { return !lockWaitFlag.load(); });
ArbitrationOperationTimedLock timedLock(mutex);
ASSERT_FALSE(mutex.try_lock());
lockHolder.join();
VELOX_ASSERT_THROW(
std::make_unique<ArbitrationTimedLock>(mutex, testData.opTimeoutNs),
"Memory arbitration lock timed out");
}
lockHolder.join();
}
}
#endif
22 changes: 4 additions & 18 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
@@ -990,41 +990,27 @@ TEST_F(MemoryReclaimerTest, arbitrationContext) {
ASSERT_FALSE(isSpillMemoryPool(leafChild2.get()));
ASSERT_TRUE(memoryArbitrationContext() == nullptr);
{
auto arbitrationStructs =
test::ArbitrationTestStructs::createArbitrationTestStructs(leafChild1);
ScopedMemoryArbitrationContext arbitrationContext(
leafChild1.get(), arbitrationStructs.operation.get());
ScopedMemoryArbitrationContext arbitrationContext(leafChild1.get());
ASSERT_TRUE(memoryArbitrationContext() != nullptr);
ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild1->name());
}
ASSERT_TRUE(memoryArbitrationContext() == nullptr);
{
auto arbitrationStructs =
test::ArbitrationTestStructs::createArbitrationTestStructs(leafChild2);
ScopedMemoryArbitrationContext arbitrationContext(
leafChild2.get(), arbitrationStructs.operation.get());
ScopedMemoryArbitrationContext arbitrationContext(leafChild2.get());
ASSERT_TRUE(memoryArbitrationContext() != nullptr);
ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild2->name());
}
ASSERT_TRUE(memoryArbitrationContext() == nullptr);
std::thread nonAbitrationThread([&]() {
ASSERT_TRUE(memoryArbitrationContext() == nullptr);
{
auto arbitrationStructs =
test::ArbitrationTestStructs::createArbitrationTestStructs(
leafChild1);
ScopedMemoryArbitrationContext arbitrationContext(
leafChild1.get(), arbitrationStructs.operation.get());
ScopedMemoryArbitrationContext arbitrationContext(leafChild1.get());
ASSERT_TRUE(memoryArbitrationContext() != nullptr);
ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild1->name());
}
ASSERT_TRUE(memoryArbitrationContext() == nullptr);
{
auto arbitrationStructs =
test::ArbitrationTestStructs::createArbitrationTestStructs(
leafChild2);
ScopedMemoryArbitrationContext arbitrationContext(
leafChild2.get(), arbitrationStructs.operation.get());
ScopedMemoryArbitrationContext arbitrationContext(leafChild2.get());
ASSERT_TRUE(memoryArbitrationContext() != nullptr);
ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild2->name());
}
5 changes: 1 addition & 4 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
@@ -3888,10 +3888,7 @@ TEST_P(MemoryPoolTest, overuseUnderArbitration) {
ASSERT_FALSE(child->maybeReserve(2 * kMaxSize));
ASSERT_EQ(child->usedBytes(), 0);
ASSERT_EQ(child->reservedBytes(), 0);
auto arbitrationTestStructs =
test::ArbitrationTestStructs::createArbitrationTestStructs(root);
ScopedMemoryArbitrationContext scopedMemoryArbitration(
root.get(), arbitrationTestStructs.operation.get());
ScopedMemoryArbitrationContext scopedMemoryArbitration(root.get());
ASSERT_TRUE(underMemoryArbitration());
ASSERT_TRUE(child->maybeReserve(2 * kMaxSize));
ASSERT_EQ(child->usedBytes(), 0);
24 changes: 4 additions & 20 deletions velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
@@ -1735,11 +1735,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) {
const auto oldReservedBytes = writerPool->reservedBytes();
const auto oldUsedBytes = writerPool->usedBytes();
{
auto arbitrationStructs =
memory::test::ArbitrationTestStructs::createArbitrationTestStructs(
writerPool);
memory::ScopedMemoryArbitrationContext arbitrationCtx(
writerPool.get(), arbitrationStructs.operation.get());
memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get());
writerPool->reclaim(1L << 30, 0, stats);
}
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
@@ -1778,11 +1774,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) {
writer->testingNonReclaimableSection() = false;
stats.numNonReclaimableAttempts = 0;
{
auto arbitrationStructs =
memory::test::ArbitrationTestStructs::createArbitrationTestStructs(
writerPool);
memory::ScopedMemoryArbitrationContext arbitrationCtx(
writerPool.get(), arbitrationStructs.operation.get());
memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get());
const auto reclaimedBytes = writerPool->reclaim(1L << 30, 0, stats);
ASSERT_GT(reclaimedBytes, 0);
}
@@ -2124,11 +2116,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) {
*writerPool, reclaimableBytes));
ASSERT_GT(reclaimableBytes, 0);
{
auto arbitrationStructs =
memory::test::ArbitrationTestStructs::createArbitrationTestStructs(
writerPool);
memory::ScopedMemoryArbitrationContext arbitrationCtx(
writerPool.get(), arbitrationStructs.operation.get());
memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get());
ASSERT_GT(writerPool->reclaim(1L << 30, 0, stats), 0);
}
ASSERT_GT(stats.reclaimExecTimeUs, 0);
@@ -2138,11 +2126,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) {
*writerPool, reclaimableBytes));
ASSERT_EQ(reclaimableBytes, 0);
{
auto arbitrationStructs =
memory::test::ArbitrationTestStructs::createArbitrationTestStructs(
writerPool);
memory::ScopedMemoryArbitrationContext arbitrationCtx(
writerPool.get(), arbitrationStructs.operation.get());
memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get());
ASSERT_EQ(writerPool->reclaim(1L << 30, 0, stats), 0);
}
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
Loading