Skip to content

Commit

Permalink
Avoid unnecessary memory capacity growth in case of concurrent arbitr… (
Browse files Browse the repository at this point in the history
facebookincubator#9557)

Summary:
This PR avoids unnecessary memory capacity growth in case of concurrent arbitration requests
from the same query. The first arbitration request might have reserved enough capacity from the
arbitrator (we allocate more than request to reduce the number of arbitrations). We avoid this by
checking if there is sufficient free capacity in the request pool itself before allocating more capacity
from the arbitrator. Also to avoid unnecessary retries from the memory pool, we support to commit
the reservation bytes before return arbitration success back to the memory pool. Correspondingly,
the memory pool doesn't have to increase the reservation and check for retry on capacity grow success
from the arbitrator.

Pull Request resolved: facebookincubator#9557

Reviewed By: tanjialiang, oerling

Differential Revision: D56444509

Pulled By: xiaoxmeng

fbshipit-source-id: adbff6ba18389c30c601e627a325c6d7df1f907c
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Apr 26, 2024
1 parent 32289f9 commit 97160cd
Show file tree
Hide file tree
Showing 19 changed files with 328 additions and 410 deletions.
5 changes: 2 additions & 3 deletions velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ void FaultyWriteFile::append(std::string_view data) {
if (injectionHook_ != nullptr) {
FaultFileWriteOperation op(path_, data);
injectionHook_(&op);
if (op.delegate) {
delegatedFile_->append(op.data);
if (!op.delegate) {
return;
}
return;
}
delegatedFile_->append(data);
}
Expand Down
4 changes: 2 additions & 2 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ struct FaultFileReadvOperation : FaultFileOperation {

/// Fault injection parameters for file write API.
struct FaultFileWriteOperation : FaultFileOperation {
std::string_view data;
std::string_view* data;

FaultFileWriteOperation(
const std::string& _path,
const std::string_view& _data)
: FaultFileOperation(FaultFileOperation::Type::kWrite, _path),
data(_data) {}
data(const_cast<std::string_view*>(&_data)) {}
};

/// The fault injection hook on the file operation path.
Expand Down
2 changes: 1 addition & 1 deletion velox/common/file/tests/FileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) {
return;
}
auto* writeOp = static_cast<FaultFileWriteOperation*>(op);
writeOp->data = "Error data";
*writeOp->data = "Error data";
});
{
auto writeFile = fs_->openFileForWrite(path1, {});
Expand Down
7 changes: 6 additions & 1 deletion velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
VELOX_USER_CHECK_GE(capacity(), 0);
VELOX_CHECK_GE(allocator_->capacity(), arbitrator_->capacity());
MemoryAllocator::alignmentCheck(0, alignment_);
defaultRoot_->grow(defaultRoot_->maxCapacity());
const bool ret = defaultRoot_->grow(defaultRoot_->maxCapacity(), 0);
VELOX_CHECK(
ret,
"Failed to set max capacity {} for {}",
succinctBytes(defaultRoot_->maxCapacity()),
defaultRoot_->name());
const size_t numSharedPools =
std::max(1, FLAGS_velox_memory_num_shared_leaf_pools);
sharedLeafPools_.reserve(numSharedPools);
Expand Down
10 changes: 5 additions & 5 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ struct MemoryManagerOptions {
/// pool.
uint64_t memoryPoolInitCapacity{256 << 20};

/// The minimal memory capacity reserved for a query memory pool to run.
/// The minimal query memory pool capacity that is ensured during arbitration.
/// During arbitration, memory arbitrator ensures the participants' memory
/// pool capacity to be no less than this value on a best-effort basis, for
/// more smooth executions of the queries, to avoid frequent arbitration
/// requests.
uint64_t memoryPoolReservedCapacity{0};

/// The minimal memory capacity to transfer out of or into a memory pool
Expand Down Expand Up @@ -287,10 +291,6 @@ class MemoryManager {
return sharedLeafPools_;
}

bool testingGrowPool(MemoryPool* pool, uint64_t incrementBytes) {
return growPool(pool, incrementBytes);
}

private:
void dropPool(MemoryPool* pool);

Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class NoopArbitrator : public MemoryArbitrator {
// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity reserve.
uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override {
pool->grow(pool->maxCapacity());
return pool->maxCapacity();
pool->grow(pool->maxCapacity(), 0);
return pool->capacity();
}

// Noop arbitrator has no memory capacity limit so no operation needed for
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class MemoryArbitrator {
/// Invoked by the memory manager to allocate up to 'targetBytes' of free
/// memory capacity without triggering memory arbitration. The function will
/// grow the memory pool's capacity based on the free available memory
/// capacity in the arbitrator, and returns the actual growed capacity in
/// capacity in the arbitrator, and returns the actual grown capacity in
/// bytes.
virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0;

Expand Down
53 changes: 34 additions & 19 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,15 +809,9 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
TestValue::adjust(
"facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback",
this);
// NOTE: the memory reservation might still fail even if the memory grow
// callback succeeds. The reason is that we don't hold the root tracker's
// mutex lock while running the grow callback. Therefore, there is a
// possibility in theory that a concurrent memory reservation request
// might steal away the increased memory capacity after the grow callback
// finishes and before we increase the reservation. If it happens, we can
// simply fall back to retry the memory reservation from the leaf memory
// pool which should happen rarely.
return maybeIncrementReservation(size);
// NOTE: if memory arbitration succeeds, it should have already committed
// the reservation 'size' in the root memory pool.
return true;
}
VELOX_MEM_POOL_CAP_EXCEEDED(fmt::format(
"Exceeded memory pool cap of {} with max {} when requesting {}, memory "
Expand Down Expand Up @@ -846,12 +840,16 @@ bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
return false;
}
}
reservationBytes_ += size;
incrementReservationLocked(size);
return true;
}

void MemoryPoolImpl::incrementReservationLocked(uint64_t bytes) {
reservationBytes_ += bytes;
if (!isLeaf()) {
cumulativeBytes_ += size;
cumulativeBytes_ += bytes;
maybeUpdatePeakBytesLocked(reservationBytes_);
}
return true;
}

void MemoryPoolImpl::release() {
Expand Down Expand Up @@ -1029,20 +1027,29 @@ uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes) {
return freeBytes;
}

uint64_t MemoryPoolImpl::grow(uint64_t bytes) noexcept {
bool MemoryPoolImpl::grow(uint64_t growBytes, uint64_t reservationBytes) {
if (parent_ != nullptr) {
return parent_->grow(bytes);
return parent_->grow(growBytes, reservationBytes);
}
// TODO: add to prevent from growing beyond the max capacity and the
// corresponding support in memory arbitrator.
std::lock_guard<std::mutex> l(mutex_);
// We don't expect to grow a memory pool without capacity limit.
VELOX_CHECK_NE(capacity_, kMaxMemory, "Can't grow with unlimited capacity");
VELOX_CHECK_LE(
capacity_ + bytes, maxCapacity_, "Can't grow beyond the max capacity");
capacity_ += bytes;
VELOX_CHECK_GE(capacity_, bytes);
return capacity_;
if (capacity_ + growBytes > maxCapacity_) {
return false;
}
if (reservationBytes_ + reservationBytes > capacity_ + growBytes) {
return false;
}

capacity_ += growBytes;
VELOX_CHECK_GE(capacity_, growBytes);
if (reservationBytes > 0) {
incrementReservationLocked(reservationBytes);
VELOX_CHECK_LE(reservationBytes, reservationBytes_);
}
return true;
}

void MemoryPoolImpl::abort(const std::exception_ptr& error) {
Expand Down Expand Up @@ -1081,6 +1088,14 @@ void MemoryPoolImpl::testingSetCapacity(int64_t bytes) {
capacity_ = bytes;
}

void MemoryPoolImpl::testingSetReservation(int64_t bytes) {
if (parent_ != nullptr) {
return toImpl(parent_)->testingSetReservation(bytes);
}
std::lock_guard<std::mutex> l(mutex_);
reservationBytes_ = bytes;
}

bool MemoryPoolImpl::needRecordDbg(bool /* isAlloc */) {
if (!debugPoolNameRegex_.empty()) {
return RE2::FullMatch(name_, debugPoolNameRegex_);
Expand Down
18 changes: 14 additions & 4 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,15 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// 'targetBytes' is zero, the function frees all the free memory capacity.
virtual uint64_t shrink(uint64_t targetBytes = 0) = 0;

/// Invoked to increase the memory pool's capacity by 'bytes'. The function
/// returns the memory pool's capacity after the growth.
virtual uint64_t grow(uint64_t bytes) noexcept = 0;
/// Invoked to increase the memory pool's capacity by 'growBytes' and commit
/// the reservation by 'reservationBytes'. The function makes the two updates
/// atomic. The function returns true if the updates succeed, otherwise false
/// and neither change will apply.
///
/// NOTE: this should only be called by memory arbitrator when a root memory
/// pool tries to grow its capacity for a new reservation request which
/// exceeds its current capacity limit.
virtual bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) = 0;

/// Sets the memory reclaimer for this memory pool.
///
Expand Down Expand Up @@ -646,7 +652,7 @@ class MemoryPoolImpl : public MemoryPool {

uint64_t shrink(uint64_t targetBytes = 0) override;

uint64_t grow(uint64_t bytes) noexcept override;
bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) override;

void abort(const std::exception_ptr& error) override;

Expand Down Expand Up @@ -684,6 +690,8 @@ class MemoryPoolImpl : public MemoryPool {

void testingSetCapacity(int64_t bytes);

void testingSetReservation(int64_t bytes);

MemoryManager* testingManager() const {
return manager_;
}
Expand Down Expand Up @@ -845,6 +853,8 @@ class MemoryPoolImpl : public MemoryPool {
// returns true, otherwise the function returns false.
bool maybeIncrementReservation(uint64_t size);

void incrementReservationLocked(uint64_t bytes);

// Release memory reservation for an allocation free or memory release with
// specified 'size'. If 'releaseOnly' is true, then we only release the unused
// reservation if 'minReservationBytes_' is set. 'releaseThreadSafe' processes
Expand Down
Loading

0 comments on commit 97160cd

Please sign in to comment.