From 596476ae0988d6954efd1e43dc09efdd5184a63d Mon Sep 17 00:00:00 2001 From: Bikramjeet Vig Date: Thu, 21 Mar 2024 09:36:29 -0700 Subject: [PATCH] Ensure memory reservation before shrinking cache and allocating (#9136) Summary: Currently, the following allocation APIs in memory allocator, namely allocateNonContiguous, allocateContiguous and growContiguous attempt to shrink data cache (if enough allocated memory is not available) before attempting to increase reservation from caller's respective memory pool and finally doing the actual allocation. Motivation: The cache shrinking process used during allocation attempts is currently lock-free and opportunistic. This means that a memory chunk cleared by one thread can be seized by another thread attempting to allocate. We aim to implement a mechanism that will serialize the release and capture of this chunk for a single thread if multiple previous attempts have been unsuccessful. This will make the process more deterministic for a single thread. However, the current implementation presents a challenge. If a thread holds a lock inside the cache and then attempts a reservation, the reservation can trigger an arbitration. If the arbitration selects a query to terminate but one of its drivers is waiting on the cache lock, it will never go off thread, resulting in a deadlock. To avoid this, we propose attempting reservation before trying to shrink the cache, thereby keeping their locking mechanisms independent. Also, if the reservation fails, it can lead to unnecessary clearing of cached memory. Reviewed By: xiaoxmeng Differential Revision: D55034349 --- velox/common/memory/MallocAllocator.cpp | 117 ++++--------------- velox/common/memory/MallocAllocator.h | 11 +- velox/common/memory/MemoryAllocator.cpp | 148 +++++++++++++++++------- velox/common/memory/MemoryAllocator.h | 48 ++++---- velox/common/memory/MmapAllocator.cpp | 121 ++++--------------- velox/common/memory/MmapAllocator.h | 11 +- 6 files changed, 181 insertions(+), 275 deletions(-) diff --git a/velox/common/memory/MallocAllocator.cpp b/velox/common/memory/MallocAllocator.cpp index 707d5a4d533c3..1440b8d5245ae 100644 --- a/velox/common/memory/MallocAllocator.cpp +++ b/velox/common/memory/MallocAllocator.cpp @@ -47,65 +47,36 @@ MallocAllocator::~MallocAllocator() { } bool MallocAllocator::allocateNonContiguousWithoutRetry( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB, - MachinePageCount minSizeClass) { - const uint64_t freedBytes = freeNonContiguous(out); - if (numPages == 0) { - if (freedBytes != 0 && reservationCB != nullptr) { - reservationCB(freedBytes, false); - } + const SizeMix& sizeMix, + Allocation& out) { + freeNonContiguous(out); + if (sizeMix.totalPages == 0) { return true; } - const SizeMix mix = allocationSize(numPages, minSizeClass); - const auto totalBytes = AllocationTraits::pageBytes(mix.totalPages); + const auto totalBytes = AllocationTraits::pageBytes(sizeMix.totalPages); if (testingHasInjectedFailure(InjectedFailure::kCap) || !incrementUsage(totalBytes)) { - if (freedBytes != 0 && reservationCB != nullptr) { - reservationCB(freedBytes, false); - } const auto errorMsg = fmt::format( - "Exceeded memory allocator limit when allocating {} new pages for " - "total allocation of {} pages, the memory allocator capacity is {}", - mix.totalPages, - numPages, + "Exceeded memory allocator limit when allocating {} new pages" + ", the memory allocator capacity is {}", + sizeMix.totalPages, succinctBytes(capacity_)); VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; setAllocatorFailureMessage(errorMsg); return false; } - uint64_t bytesToAllocate = 0; - if (reservationCB != nullptr) { - bytesToAllocate = AllocationTraits::pageBytes(mix.totalPages) - freedBytes; - try { - reservationCB(bytesToAllocate, true); - } catch (std::exception&) { - VELOX_MEM_LOG(WARNING) - << "Failed to reserve " << succinctBytes(bytesToAllocate) - << " for non-contiguous allocation of " << numPages - << " pages, then release " << succinctBytes(freedBytes) - << " from the old allocation"; - // If the new memory reservation fails, we need to release the memory - // reservation of the freed memory of previously allocation. - reservationCB(freedBytes, false); - decrementUsage(totalBytes); - std::rethrow_exception(std::current_exception()); - } - } - std::vector buffers; - buffers.reserve(mix.numSizes); - for (int32_t i = 0; i < mix.numSizes; ++i) { + buffers.reserve(sizeMix.numSizes); + for (int32_t i = 0; i < sizeMix.numSizes; ++i) { MachinePageCount numSizeClassPages = - mix.sizeCounts[i] * sizeClassSizes_[mix.sizeIndices[i]]; + sizeMix.sizeCounts[i] * sizeClassSizes_[sizeMix.sizeIndices[i]]; void* ptr = nullptr; // Trigger allocation failure by skipping malloc if (!testingHasInjectedFailure(InjectedFailure::kAllocate)) { stats_.recordAllocate( - AllocationTraits::pageBytes(sizeClassSizes_[mix.sizeIndices[i]]), - mix.sizeCounts[i], + AllocationTraits::pageBytes(sizeClassSizes_[sizeMix.sizeIndices[i]]), + sizeMix.sizeCounts[i], [&]() { ptr = ::malloc( AllocationTraits::pageBytes(numSizeClassPages)); // NOLINT @@ -117,7 +88,7 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry( "Malloc failed to allocate {} of memory while allocating for " "non-contiguous allocation of {} pages", succinctBytes(AllocationTraits::pageBytes(numSizeClassPages)), - numPages); + sizeMix.totalPages); VELOX_MEM_LOG(WARNING) << errorMsg; setAllocatorFailureMessage(errorMsg); break; @@ -126,27 +97,22 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry( out.append(reinterpret_cast(ptr), numSizeClassPages); // NOLINT } - if (buffers.size() != mix.numSizes) { + if (buffers.size() != sizeMix.numSizes) { // Failed to allocate memory using malloc. Free any malloced pages and // return false. for (auto* buffer : buffers) { ::free(buffer); } out.clear(); - if (reservationCB != nullptr) { - VELOX_MEM_LOG(WARNING) - << "Failed to allocate memory for non-contiguous allocation of " - << numPages << " pages, then release " - << succinctBytes(bytesToAllocate + freedBytes) - << " of memory reservation including the old allocation"; - reservationCB(bytesToAllocate + freedBytes, false); - } + VELOX_MEM_LOG(WARNING) + << "Failed to allocate memory for non-contiguous allocation of " + << sizeMix.totalPages << " pages"; decrementUsage(totalBytes); return false; } // Successfully allocated all pages. - numAllocated_.fetch_add(mix.totalPages); + numAllocated_.fetch_add(sizeMix.totalPages); return true; } @@ -154,12 +120,10 @@ bool MallocAllocator::allocateContiguousWithoutRetry( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages) { bool result; stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); + result = allocateContiguousImpl(numPages, collateral, allocation, maxPages); }); return result; } @@ -168,7 +132,6 @@ bool MallocAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages) { if (maxPages == 0) { maxPages = numPages; @@ -192,23 +155,13 @@ bool MallocAllocator::allocateContiguousImpl( decrementUsage(AllocationTraits::pageBytes(numContiguousCollateralPages)); allocation.clear(); } - const auto totalCollateralPages = - numCollateralPages + numContiguousCollateralPages; - const auto totalCollateralBytes = - AllocationTraits::pageBytes(totalCollateralPages); if (numPages == 0) { - if (totalCollateralBytes != 0 && reservationCB != nullptr) { - reservationCB(totalCollateralBytes, false); - } return true; } const auto totalBytes = AllocationTraits::pageBytes(numPages); if (testingHasInjectedFailure(InjectedFailure::kCap) || !incrementUsage(totalBytes)) { - if (totalCollateralBytes != 0 && reservationCB != nullptr) { - reservationCB(totalCollateralBytes, false); - } const auto errorMsg = fmt::format( "Exceeded memory allocator limit when allocating {} new pages, the " "memory allocator capacity is {}", @@ -218,23 +171,6 @@ bool MallocAllocator::allocateContiguousImpl( VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; return false; } - const int64_t numNeededPages = numPages - totalCollateralPages; - if (reservationCB != nullptr) { - try { - reservationCB(AllocationTraits::pageBytes(numNeededPages), true); - } catch (std::exception&) { - // If the new memory reservation fails, we need to release the memory - // reservation of the freed contiguous and non-contiguous memory. - VELOX_MEM_LOG(WARNING) - << "Failed to reserve " << AllocationTraits::pageBytes(numNeededPages) - << " bytes for contiguous allocation of " << numPages - << " pages, then release " << succinctBytes(totalCollateralBytes) - << " from the old allocations"; - reservationCB(totalCollateralBytes, false); - decrementUsage(totalBytes); - std::rethrow_exception(std::current_exception()); - } - } numAllocated_.fetch_add(numPages); numMapped_.fetch_add(numPages); void* data = ::mmap( @@ -300,15 +236,7 @@ void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { bool MallocAllocator::growContiguousWithoutRetry( MachinePageCount increment, - ContiguousAllocation& allocation, - ReservationCallback reservationCB) { - VELOX_CHECK_LE( - allocation.size() + increment * AllocationTraits::kPageSize, - allocation.maxSize()); - if (reservationCB != nullptr) { - // May throw. If does, there is nothing to revert. - reservationCB(AllocationTraits::pageBytes(increment), true); - } + ContiguousAllocation& allocation) { if (!incrementUsage(AllocationTraits::pageBytes(increment))) { const auto errorMsg = fmt::format( "Exceeded memory allocator limit when allocating {} new pages for " @@ -318,9 +246,6 @@ bool MallocAllocator::growContiguousWithoutRetry( succinctBytes(capacity_)); setAllocatorFailureMessage(errorMsg); VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(increment), false); - } return false; } numAllocated_ += increment; diff --git a/velox/common/memory/MallocAllocator.h b/velox/common/memory/MallocAllocator.h index 489d3d842e2c4..96774431e7934 100644 --- a/velox/common/memory/MallocAllocator.h +++ b/velox/common/memory/MallocAllocator.h @@ -55,8 +55,7 @@ class MallocAllocator : public MemoryAllocator { bool growContiguousWithoutRetry( MachinePageCount increment, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) override; + ContiguousAllocation& allocation) override; void freeBytes(void* p, uint64_t bytes) noexcept override; @@ -84,23 +83,19 @@ class MallocAllocator : public MemoryAllocator { private: bool allocateNonContiguousWithoutRetry( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; + const SizeMix& sizeMix, + Allocation& out) override; bool allocateContiguousWithoutRetry( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, MachinePageCount maxPages = 0) override; bool allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages); void freeContiguousImpl(ContiguousAllocation& allocation); diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index 1bca6219ceda5..5d5741f9f566d 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -161,26 +161,53 @@ bool MemoryAllocator::allocateNonContiguous( Allocation& out, ReservationCallback reservationCB, MachinePageCount minSizeClass) { + const MachinePageCount numPagesToFree = out.numPages(); + const uint64_t bytesToFree = AllocationTraits::pageBytes(numPagesToFree); + auto cleanupAllocAndReleaseReservation = [&](uint64_t reservationBytes) { + if (!out.empty()) { + freeNonContiguous(out); + } + if (reservationCB != nullptr && reservationBytes > 0) { + reservationCB(reservationBytes, false); + } + }; + if (numPages == 0) { + cleanupAllocAndReleaseReservation(bytesToFree); + return true; + } + + const SizeMix mix = allocationSize(numPages, minSizeClass); + const int64_t numNeededPages = mix.totalPages - numPagesToFree; + // TODO: handle negative 'numNeededPages' in a follow-up. + + if (reservationCB != nullptr) { + try { + reservationCB(AllocationTraits::pageBytes(numNeededPages), true); + } catch (const std::exception&) { + VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) + << "Exceeded memory reservation limit when reserve " << numNeededPages + << " new pages when allocate " << mix.totalPages << " pages"; + cleanupAllocAndReleaseReservation(bytesToFree); + std::rethrow_exception(std::current_exception()); + } + } + + const auto totalBytesReserved = AllocationTraits::pageBytes(mix.totalPages); + bool success = false; if (cache() == nullptr) { - return allocateNonContiguousWithoutRetry( - numPages, out, reservationCB, minSizeClass); + success = allocateNonContiguousWithoutRetry(mix, out); + } else { + success = cache()->makeSpace( + pagesToAcquire(numPages, out.numPages()), [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateNonContiguousWithoutRetry(mix, out); + }); } - const bool success = cache()->makeSpace( - pagesToAcquire(numPages, out.numPages()), [&](Allocation& acquired) { - freeNonContiguous(acquired); - return allocateNonContiguousWithoutRetry( - numPages, out, reservationCB, minSizeClass); - }); if (!success) { // There can be a failure where allocation was never called because there // never was a chance based on numAllocated() and capacity(). Make sure old // data is still freed. - if (!out.empty()) { - if (reservationCB) { - reservationCB(AllocationTraits::pageBytes(out.numPages()), false); - } - freeNonContiguous(out); - } + cleanupAllocAndReleaseReservation(totalBytesReserved); } return success; } @@ -191,35 +218,62 @@ bool MemoryAllocator::allocateContiguous( ContiguousAllocation& allocation, ReservationCallback reservationCB, MachinePageCount maxPages) { - if (cache() == nullptr) { - return allocateContiguousWithoutRetry( - numPages, collateral, allocation, reservationCB, maxPages); - } - auto numCollateralPages = + const MachinePageCount numCollateralPages = allocation.numPages() + (collateral ? collateral->numPages() : 0); - const bool success = cache()->makeSpace( - pagesToAcquire(numPages, numCollateralPages), [&](Allocation& acquired) { - freeNonContiguous(acquired); - return allocateContiguousWithoutRetry( - numPages, collateral, allocation, reservationCB, maxPages); - }); - if (!success) { - // There can be a failure where allocation was never called because there - // never was a chance based on numAllocated() and capacity(). Make sure old - // data is still freed. - int64_t freedBytes{0}; + const uint64_t totalCollateralBytes = + AllocationTraits::pageBytes(numCollateralPages); + const int64_t newPages = numPages - numCollateralPages; + auto cleanupCollateralAndReleaseReservation = [&](uint64_t reservationBytes) { if ((collateral != nullptr) && !collateral->empty()) { - freedBytes += AllocationTraits::pageBytes(collateral->numPages()); freeNonContiguous(*collateral); } if (!allocation.empty()) { - freedBytes += allocation.size(); freeContiguous(allocation); } - if ((reservationCB) != nullptr && (freedBytes > 0)) { - reservationCB(freedBytes, false); + if ((reservationCB) != nullptr && (reservationBytes > 0)) { + reservationCB(reservationBytes, false); + } + }; + + if (numPages == 0) { + cleanupCollateralAndReleaseReservation(totalCollateralBytes); + return true; + } + // TODO: handle negative 'newPages'. + if (reservationCB != nullptr) { + try { + reservationCB(AllocationTraits::pageBytes(newPages), true); + } catch (const std::exception& e) { + VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) + << "Exceeded memory reservation limit when reserve " << newPages + << " new pages when allocate " << numPages + << " pages, error: " << e.what(); + cleanupCollateralAndReleaseReservation(totalCollateralBytes); + std::rethrow_exception(std::current_exception()); } } + + const uint64_t totalBytesReserved = AllocationTraits::pageBytes(numPages); + bool success = false; + if (cache() == nullptr) { + success = allocateContiguousWithoutRetry( + numPages, collateral, allocation, maxPages); + } else { + success = cache()->makeSpace( + pagesToAcquire(numPages, numCollateralPages), + [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, maxPages); + }); + } + + if (!success) { + // There can be a failure where allocation was never called because there + // never was a chance based on numAllocated() and capacity(). Make sure old + // data is still freed. + cleanupCollateralAndReleaseReservation(totalBytesReserved); + } return success; } @@ -227,13 +281,29 @@ bool MemoryAllocator::growContiguous( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB) { + VELOX_CHECK_LE( + allocation.size() + increment * AllocationTraits::kPageSize, + allocation.maxSize()); + if (increment == 0) { + return true; + } + if (reservationCB != nullptr) { + // May throw. If does, there is nothing to revert. + reservationCB(AllocationTraits::pageBytes(increment), true); + } + bool success = false; if (cache() == nullptr) { - return growContiguousWithoutRetry(increment, allocation, reservationCB); + success = growContiguousWithoutRetry(increment, allocation); + } else { + success = cache()->makeSpace(increment, [&](Allocation& acquired) { + freeNonContiguous(acquired); + return growContiguousWithoutRetry(increment, allocation); + }); } - return cache()->makeSpace(increment, [&](Allocation& acquired) { - freeNonContiguous(acquired); - return growContiguousWithoutRetry(increment, allocation, reservationCB); - }); + if (!success && reservationCB != nullptr) { + reservationCB(AllocationTraits::pageBytes(increment), false); + } + return success; } void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 8fb71ce72b549..23a5d48a643eb 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -404,20 +404,36 @@ class MemoryAllocator : public std::enable_shared_from_this { protected: explicit MemoryAllocator() = default; + /// Represents a mix of blocks of different sizes for covering a single + /// allocation. + struct SizeMix { + // Index into 'sizeClassSizes_' + std::vector sizeIndices; + // Number of items of the class of the corresponding element in + // '"sizeIndices'. + std::vector sizeCounts; + // Number of valid elements in 'sizeCounts' and 'sizeIndices'. + int32_t numSizes{0}; + // Total number of pages. + int32_t totalPages{0}; + + SizeMix() { + sizeIndices.reserve(kMaxSizeClasses); + sizeCounts.reserve(kMaxSizeClasses); + } + }; + /// The actual memory allocation function implementation without retry /// attempts by making space from cache. virtual bool allocateContiguousWithoutRetry( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, MachinePageCount maxPages = 0) = 0; virtual bool allocateNonContiguousWithoutRetry( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB, - MachinePageCount minSizeClass) = 0; + const SizeMix& sizeMix, + Allocation& out) = 0; virtual void* allocateBytesWithoutRetry( uint64_t bytes, @@ -427,8 +443,7 @@ class MemoryAllocator : public std::enable_shared_from_this { virtual bool growContiguousWithoutRetry( MachinePageCount increment, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) = 0; + ContiguousAllocation& allocation) = 0; // 'Cache' getter. The cache is only responsible for freeing up memory space // by shrinking itself when there is not enough space upon allocating. The @@ -440,25 +455,6 @@ class MemoryAllocator : public std::enable_shared_from_this { size_t bytes, const std::vector& sizes); - // Represents a mix of blocks of different sizes for covering a single - // allocation. - struct SizeMix { - // Index into 'sizeClassSizes_' - std::vector sizeIndices; - // Number of items of the class of the corresponding element in - // '"sizeIndices'. - std::vector sizeCounts; - // Number of valid elements in 'sizeCounts' and 'sizeIndices'. - int32_t numSizes{0}; - // Total number of pages. - int32_t totalPages{0}; - - SizeMix() { - sizeIndices.reserve(kMaxSizeClasses); - sizeCounts.reserve(kMaxSizeClasses); - } - }; - // Returns a mix of standard sizes and allocation counts for covering // 'numPages' worth of memory. 'minSizeClass' is the size of the // smallest usable size class. diff --git a/velox/common/memory/MmapAllocator.cpp b/velox/common/memory/MmapAllocator.cpp index 8beaff9504457..11f6c86fa0d4c 100644 --- a/velox/common/memory/MmapAllocator.cpp +++ b/velox/common/memory/MmapAllocator.cpp @@ -52,78 +52,53 @@ MmapAllocator::~MmapAllocator() { } bool MmapAllocator::allocateNonContiguousWithoutRetry( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB, - MachinePageCount minSizeClass) { + const SizeMix& sizeMix, + Allocation& out) { const MachinePageCount numFreed = freeNonContiguousInternal(out); - const auto bytesFreed = AllocationTraits::pageBytes(numFreed); if (numFreed != 0) { numAllocated_.fetch_sub(numFreed); } - if (numPages == 0) { - if ((bytesFreed != 0) && (reservationCB != nullptr)) { - reservationCB(bytesFreed, false); - } + if (sizeMix.totalPages == 0) { return true; } - const SizeMix mix = allocationSize(numPages, minSizeClass); - if (numAllocated_ + mix.totalPages > capacity_ || + if (numAllocated_ + sizeMix.totalPages > capacity_ || testingHasInjectedFailure(InjectedFailure::kCap)) { const std::string errorMsg = fmt::format( "Exceeded memory allocator limit when allocating {} pages with " "capacity of {} pages", - mix.totalPages, + sizeMix.totalPages, capacity_); VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; setAllocatorFailureMessage(errorMsg); - if ((bytesFreed != 0) && (reservationCB != nullptr)) { - reservationCB(bytesFreed, false); - } return false; } - if (numAllocated_.fetch_add(mix.totalPages) + mix.totalPages > capacity_) { + if (numAllocated_.fetch_add(sizeMix.totalPages) + sizeMix.totalPages > + capacity_) { const std::string errorMsg = fmt::format( "Exceeding memory allocator limit when allocating {} pages with " "capacity of {} pages", - mix.totalPages, + sizeMix.totalPages, capacity_); VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; setAllocatorFailureMessage(errorMsg); - numAllocated_.fetch_sub(mix.totalPages); - if ((bytesFreed != 0) && (reservationCB != nullptr)) { - reservationCB(bytesFreed, false); - } + numAllocated_.fetch_sub(sizeMix.totalPages); return false; } ++numAllocations_; - numAllocatedPages_ += mix.totalPages; - const int64_t numNeededPages = mix.totalPages - numFreed; - if (reservationCB != nullptr) { - try { - reservationCB(AllocationTraits::pageBytes(numNeededPages), true); - } catch (const std::exception&) { - VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) - << "Exceeded memory reservation limit when reserve " << numNeededPages - << " new pages when allocate " << mix.totalPages << " pages"; - numAllocated_.fetch_sub(mix.totalPages); - reservationCB(bytesFreed, false); - std::rethrow_exception(std::current_exception()); - } - } + numAllocatedPages_ += sizeMix.totalPages; MachinePageCount newMapsNeeded = 0; - for (int i = 0; i < mix.numSizes; ++i) { + for (int i = 0; i < sizeMix.numSizes; ++i) { bool success; stats_.recordAllocate( - AllocationTraits::pageBytes(sizeClassSizes_[mix.sizeIndices[i]]), - mix.sizeCounts[i], + AllocationTraits::pageBytes(sizeClassSizes_[sizeMix.sizeIndices[i]]), + sizeMix.sizeCounts[i], [&]() { - success = sizeClasses_[mix.sizeIndices[i]]->allocate( - mix.sizeCounts[i], newMapsNeeded, out); + success = sizeClasses_[sizeMix.sizeIndices[i]]->allocate( + sizeMix.sizeCounts[i], newMapsNeeded, out); }); - if (success && ((i > 0) || (mix.numSizes == 1)) && + if (success && ((i > 0) || (sizeMix.numSizes == 1)) && testingHasInjectedFailure(InjectedFailure::kAllocate)) { // Trigger memory allocation failure in the middle of the size class // allocation series. @@ -135,15 +110,12 @@ bool MmapAllocator::allocateNonContiguousWithoutRetry( const std::string errorMsg = fmt::format( "Failed allocation in size class {} for {} pages", i, - mix.sizeCounts[i]); + sizeMix.sizeCounts[i]); VELOX_MEM_LOG(WARNING) << errorMsg; setAllocatorFailureMessage(errorMsg); - const auto failedPages = mix.totalPages - out.numPages(); + const auto failedPages = sizeMix.totalPages - out.numPages(); freeNonContiguous(out); numAllocated_.fetch_sub(failedPages); - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(mix.totalPages), false); - } return false; } } @@ -159,13 +131,10 @@ bool MmapAllocator::allocateNonContiguousWithoutRetry( "Could not advise away enough for {} pages for total allocation " "of {} pages", newMapsNeeded, - mix.totalPages); + sizeMix.totalPages); VELOX_MEM_LOG(WARNING) << errorMsg; setAllocatorFailureMessage(errorMsg); freeNonContiguous(out); - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(mix.totalPages), false); - } return false; } @@ -237,12 +206,10 @@ bool MmapAllocator::allocateContiguousWithoutRetry( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages) { bool result; stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); + result = allocateContiguousImpl(numPages, collateral, allocation, maxPages); }); return result; } @@ -251,7 +218,6 @@ bool MmapAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages) { if (maxPages == 0) { maxPages = numPages; @@ -293,35 +259,12 @@ bool MmapAllocator::allocateContiguousImpl( } const auto totalCollateralPages = numCollateralPages + numLargeCollateralPages; - const auto totalCollateralBytes = - AllocationTraits::pageBytes(totalCollateralPages); if (numPages == 0) { - if (totalCollateralBytes != 0 && reservationCB != nullptr) { - reservationCB(totalCollateralBytes, false); - } return true; } const auto numCollateralUnmap = numLargeCollateralPages; const int64_t newPages = numPages - totalCollateralPages; - if (reservationCB != nullptr) { - try { - reservationCB(AllocationTraits::pageBytes(newPages), true); - } catch (const std::exception& e) { - VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) - << "Exceeded memory reservation limit when reserve " << newPages - << " new pages when allocate " << numPages - << " pages, error: " << e.what(); - numAllocated_ -= totalCollateralPages; - numMapped_ -= numCollateralUnmap; - numExternalMapped_ -= numCollateralUnmap; - - // We failed to grow by 'newPages. So we record the freeing off the whole - // collateral and the unmap of former 'allocation'. - reservationCB(totalCollateralBytes, false); - std::rethrow_exception(std::current_exception()); - } - } // Rolls back the counters on failure. 'mappedDecrement' is subtracted from // 'numMapped_' on top of other adjustment. @@ -334,10 +277,6 @@ bool MmapAllocator::allocateContiguousImpl( // were never allocated. numExternalMapped_ -= numPages; numMapped_ -= numCollateralUnmap + mappedDecrement; - - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(numPages), false); - } }; numExternalMapped_ += numPages - numCollateralUnmap; @@ -445,15 +384,7 @@ void MmapAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { bool MmapAllocator::growContiguousWithoutRetry( MachinePageCount increment, - ContiguousAllocation& allocation, - ReservationCallback reservationCB) { - VELOX_CHECK_LE( - allocation.size() + increment * AllocationTraits::kPageSize, - allocation.maxSize()); - if (reservationCB != nullptr) { - // May throw. If does, there is nothing to revert. - reservationCB(AllocationTraits::pageBytes(increment), true); - } + ContiguousAllocation& allocation) { auto numAllocated = numAllocated_.fetch_add(increment) + increment; if (numAllocated > capacity_ || testingHasInjectedFailure(InjectedFailure::kCap)) { @@ -467,9 +398,6 @@ bool MmapAllocator::growContiguousWithoutRetry( VELOX_MEM_LOG_EVERY_MS(WARNING, 1000) << errorMsg; setAllocatorFailureMessage(errorMsg); numAllocated_ -= increment; - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(increment), false); - } return false; } @@ -483,9 +411,6 @@ bool MmapAllocator::growContiguousWithoutRetry( allocation.numPages()); VELOX_MEM_LOG(WARNING) << errorMsg; setAllocatorFailureMessage(errorMsg); - if (reservationCB != nullptr) { - reservationCB(AllocationTraits::pageBytes(increment), false); - } numAllocated_.fetch_sub(increment); return false; } @@ -518,8 +443,8 @@ void* MmapAllocator::allocateBytesWithoutRetry( if (bytes <= AllocationTraits::pageBytes(sizeClassSizes_.back())) { Allocation allocation; const auto numPages = roundUpToSizeClassSize(bytes, sizeClassSizes_); - if (!allocateNonContiguousWithoutRetry( - numPages, allocation, nullptr, numPages)) { + const SizeMix mix = allocationSize(numPages, numPages); + if (!allocateNonContiguousWithoutRetry(mix, allocation)) { return nullptr; } auto run = allocation.runAt(0); diff --git a/velox/common/memory/MmapAllocator.h b/velox/common/memory/MmapAllocator.h index a383b90f41aee..5678a91f3008a 100644 --- a/velox/common/memory/MmapAllocator.h +++ b/velox/common/memory/MmapAllocator.h @@ -103,8 +103,7 @@ class MmapAllocator : public MemoryAllocator { bool growContiguousWithoutRetry( MachinePageCount increment, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) override; + ContiguousAllocation& allocation) override; void freeContiguous(ContiguousAllocation& allocation) override; @@ -316,23 +315,19 @@ class MmapAllocator : public MemoryAllocator { }; bool allocateNonContiguousWithoutRetry( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; + const SizeMix& sizeMix, + Allocation& out) override; bool allocateContiguousWithoutRetry( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, MachinePageCount maxPages = 0) override; bool allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, - ReservationCallback reservationCB, MachinePageCount maxPages); void freeContiguousImpl(ContiguousAllocation& allocation);