Skip to content

Commit

Permalink
Ensure memory reservation before shrinking cache and allocating
Browse files Browse the repository at this point in the history
Summary:
Currently, the following allocation APIs in memory allocator, namely
allocateNonContiguous, allocateContiguous and growContiguous attempt
to shrink data cache (if enough allocated memory is not available)
before attempting to increase reservation from caller's respective
memory pool and finally doing the actual allocation.
Motivation:
The cache shrinking process used during allocation attempts is
currently lock-free and opportunistic. This means that a memory
chunk cleared by one thread can be seized by another thread attempting
to allocate. We aim to implement a mechanism that will serialize the
release and capture of this chunk for a single thread if multiple
previous attempts have been unsuccessful. This will make the process
more deterministic for a single thread.
However, the current implementation presents a challenge. If a thread
holds a lock inside the cache and then attempts a reservation, the
reservation can trigger an arbitration. If the arbitration selects a
query to terminate but one of its drivers is waiting on the cache
lock, it will never go off thread, resulting in a deadlock. To avoid
this, we propose attempting reservation before trying to shrink the
cache, thereby keeping their locking mechanisms independent.
Also, if the reservation fails, it can lead to unnecessary
clearing of cached memory.

Differential Revision: D55034349
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Mar 18, 2024
1 parent 7d48b6a commit 5087e6e
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 272 deletions.
117 changes: 21 additions & 96 deletions velox/common/memory/MallocAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,65 +47,36 @@ MallocAllocator::~MallocAllocator() {
}

bool MallocAllocator::allocateNonContiguousWithoutRetry(
MachinePageCount numPages,
Allocation& out,
ReservationCallback reservationCB,
MachinePageCount minSizeClass) {
const uint64_t freedBytes = freeNonContiguous(out);
if (numPages == 0) {
if (freedBytes != 0 && reservationCB != nullptr) {
reservationCB(freedBytes, false);
}
const SizeMix& sizeMix,
Allocation& out) {
freeNonContiguous(out);
if (sizeMix.totalPages == 0) {
return true;
}
const SizeMix mix = allocationSize(numPages, minSizeClass);
const auto totalBytes = AllocationTraits::pageBytes(mix.totalPages);
const auto totalBytes = AllocationTraits::pageBytes(sizeMix.totalPages);
if (testingHasInjectedFailure(InjectedFailure::kCap) ||
!incrementUsage(totalBytes)) {
if (freedBytes != 0 && reservationCB != nullptr) {
reservationCB(freedBytes, false);
}
const auto errorMsg = fmt::format(
"Exceeded memory allocator limit when allocating {} new pages for "
"total allocation of {} pages, the memory allocator capacity is {}",
mix.totalPages,
numPages,
"Exceeded memory allocator limit when allocating {} new pages"
", the memory allocator capacity is {}",
sizeMix.totalPages,
succinctBytes(capacity_));
VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg;
setAllocatorFailureMessage(errorMsg);
return false;
}

uint64_t bytesToAllocate = 0;
if (reservationCB != nullptr) {
bytesToAllocate = AllocationTraits::pageBytes(mix.totalPages) - freedBytes;
try {
reservationCB(bytesToAllocate, true);
} catch (std::exception&) {
VELOX_MEM_LOG(WARNING)
<< "Failed to reserve " << succinctBytes(bytesToAllocate)
<< " for non-contiguous allocation of " << numPages
<< " pages, then release " << succinctBytes(freedBytes)
<< " from the old allocation";
// If the new memory reservation fails, we need to release the memory
// reservation of the freed memory of previously allocation.
reservationCB(freedBytes, false);
decrementUsage(totalBytes);
std::rethrow_exception(std::current_exception());
}
}

std::vector<void*> buffers;
buffers.reserve(mix.numSizes);
for (int32_t i = 0; i < mix.numSizes; ++i) {
buffers.reserve(sizeMix.numSizes);
for (int32_t i = 0; i < sizeMix.numSizes; ++i) {
MachinePageCount numSizeClassPages =
mix.sizeCounts[i] * sizeClassSizes_[mix.sizeIndices[i]];
sizeMix.sizeCounts[i] * sizeClassSizes_[sizeMix.sizeIndices[i]];
void* ptr = nullptr;
// Trigger allocation failure by skipping malloc
if (!testingHasInjectedFailure(InjectedFailure::kAllocate)) {
stats_.recordAllocate(
AllocationTraits::pageBytes(sizeClassSizes_[mix.sizeIndices[i]]),
mix.sizeCounts[i],
AllocationTraits::pageBytes(sizeClassSizes_[sizeMix.sizeIndices[i]]),
sizeMix.sizeCounts[i],
[&]() {
ptr = ::malloc(
AllocationTraits::pageBytes(numSizeClassPages)); // NOLINT
Expand All @@ -117,7 +88,7 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry(
"Malloc failed to allocate {} of memory while allocating for "
"non-contiguous allocation of {} pages",
succinctBytes(AllocationTraits::pageBytes(numSizeClassPages)),
numPages);
sizeMix.totalPages);
VELOX_MEM_LOG(WARNING) << errorMsg;
setAllocatorFailureMessage(errorMsg);
break;
Expand All @@ -126,40 +97,33 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry(
out.append(reinterpret_cast<uint8_t*>(ptr), numSizeClassPages); // NOLINT
}

if (buffers.size() != mix.numSizes) {
if (buffers.size() != sizeMix.numSizes) {
// Failed to allocate memory using malloc. Free any malloced pages and
// return false.
for (auto* buffer : buffers) {
::free(buffer);
}
out.clear();
if (reservationCB != nullptr) {
VELOX_MEM_LOG(WARNING)
<< "Failed to allocate memory for non-contiguous allocation of "
<< numPages << " pages, then release "
<< succinctBytes(bytesToAllocate + freedBytes)
<< " of memory reservation including the old allocation";
reservationCB(bytesToAllocate + freedBytes, false);
}
VELOX_MEM_LOG(WARNING)
<< "Failed to allocate memory for non-contiguous allocation of "
<< sizeMix.totalPages << " pages";
decrementUsage(totalBytes);
return false;
}

// Successfully allocated all pages.
numAllocated_.fetch_add(mix.totalPages);
numAllocated_.fetch_add(sizeMix.totalPages);
return true;
}

bool MallocAllocator::allocateContiguousWithoutRetry(
MachinePageCount numPages,
Allocation* collateral,
ContiguousAllocation& allocation,
ReservationCallback reservationCB,
MachinePageCount maxPages) {
bool result;
stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() {
result = allocateContiguousImpl(
numPages, collateral, allocation, reservationCB, maxPages);
result = allocateContiguousImpl(numPages, collateral, allocation, maxPages);
});
return result;
}
Expand All @@ -168,7 +132,6 @@ bool MallocAllocator::allocateContiguousImpl(
MachinePageCount numPages,
Allocation* collateral,
ContiguousAllocation& allocation,
ReservationCallback reservationCB,
MachinePageCount maxPages) {
if (maxPages == 0) {
maxPages = numPages;
Expand All @@ -192,23 +155,13 @@ bool MallocAllocator::allocateContiguousImpl(
decrementUsage(AllocationTraits::pageBytes(numContiguousCollateralPages));
allocation.clear();
}
const auto totalCollateralPages =
numCollateralPages + numContiguousCollateralPages;
const auto totalCollateralBytes =
AllocationTraits::pageBytes(totalCollateralPages);
if (numPages == 0) {
if (totalCollateralBytes != 0 && reservationCB != nullptr) {
reservationCB(totalCollateralBytes, false);
}
return true;
}

const auto totalBytes = AllocationTraits::pageBytes(numPages);
if (testingHasInjectedFailure(InjectedFailure::kCap) ||
!incrementUsage(totalBytes)) {
if (totalCollateralBytes != 0 && reservationCB != nullptr) {
reservationCB(totalCollateralBytes, false);
}
const auto errorMsg = fmt::format(
"Exceeded memory allocator limit when allocating {} new pages, the "
"memory allocator capacity is {}",
Expand All @@ -218,23 +171,6 @@ bool MallocAllocator::allocateContiguousImpl(
VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg;
return false;
}
const int64_t numNeededPages = numPages - totalCollateralPages;
if (reservationCB != nullptr) {
try {
reservationCB(AllocationTraits::pageBytes(numNeededPages), true);
} catch (std::exception&) {
// If the new memory reservation fails, we need to release the memory
// reservation of the freed contiguous and non-contiguous memory.
VELOX_MEM_LOG(WARNING)
<< "Failed to reserve " << AllocationTraits::pageBytes(numNeededPages)
<< " bytes for contiguous allocation of " << numPages
<< " pages, then release " << succinctBytes(totalCollateralBytes)
<< " from the old allocations";
reservationCB(totalCollateralBytes, false);
decrementUsage(totalBytes);
std::rethrow_exception(std::current_exception());
}
}
numAllocated_.fetch_add(numPages);
numMapped_.fetch_add(numPages);
void* data = ::mmap(
Expand Down Expand Up @@ -300,15 +236,7 @@ void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) {

bool MallocAllocator::growContiguousWithoutRetry(
MachinePageCount increment,
ContiguousAllocation& allocation,
ReservationCallback reservationCB) {
VELOX_CHECK_LE(
allocation.size() + increment * AllocationTraits::kPageSize,
allocation.maxSize());
if (reservationCB != nullptr) {
// May throw. If does, there is nothing to revert.
reservationCB(AllocationTraits::pageBytes(increment), true);
}
ContiguousAllocation& allocation) {
if (!incrementUsage(AllocationTraits::pageBytes(increment))) {
const auto errorMsg = fmt::format(
"Exceeded memory allocator limit when allocating {} new pages for "
Expand All @@ -318,9 +246,6 @@ bool MallocAllocator::growContiguousWithoutRetry(
succinctBytes(capacity_));
setAllocatorFailureMessage(errorMsg);
VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg;
if (reservationCB != nullptr) {
reservationCB(AllocationTraits::pageBytes(increment), false);
}
return false;
}
numAllocated_ += increment;
Expand Down
11 changes: 3 additions & 8 deletions velox/common/memory/MallocAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class MallocAllocator : public MemoryAllocator {

bool growContiguousWithoutRetry(
MachinePageCount increment,
ContiguousAllocation& allocation,
ReservationCallback reservationCB = nullptr) override;
ContiguousAllocation& allocation) override;

void freeBytes(void* p, uint64_t bytes) noexcept override;

Expand Down Expand Up @@ -84,23 +83,19 @@ class MallocAllocator : public MemoryAllocator {

private:
bool allocateNonContiguousWithoutRetry(
MachinePageCount numPages,
Allocation& out,
ReservationCallback reservationCB = nullptr,
MachinePageCount minSizeClass = 0) override;
const SizeMix& sizeMix,
Allocation& out) override;

bool allocateContiguousWithoutRetry(
MachinePageCount numPages,
Allocation* collateral,
ContiguousAllocation& allocation,
ReservationCallback reservationCB = nullptr,
MachinePageCount maxPages = 0) override;

bool allocateContiguousImpl(
MachinePageCount numPages,
Allocation* collateral,
ContiguousAllocation& allocation,
ReservationCallback reservationCB,
MachinePageCount maxPages);

void freeContiguousImpl(ContiguousAllocation& allocation);
Expand Down
Loading

0 comments on commit 5087e6e

Please sign in to comment.