From c828bc8d8e5a48198db03664c0c33cf213afa3e5 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Sat, 7 Oct 2023 21:18:21 -0700 Subject: [PATCH] Improve allocation success when evicting from cache An allocation will evict cache to make space for data. If the the evicted data is freed and then an allocation is attempted, it can be that another thread has snatched the freed data. With high transient memory allocation rate and many threads, it can happen that a large allocation runs out of retries even if it could be satisfied. Therefore, when evicting in order to make space for an allocation, we keep hold of the non-contiguous Allocations we remove from cache. Many allocations can in this way be concatenated into an allocation that covers the needed number of pages. This can then atomically be converted into a new non-contiguous or contiguous allocation. The allocate*WithoutRetry methods accept a non-contiguous allocation to be freed to provide pages for the new allocation. Memory mapping magic can convert non-contiguous freed pages to contiguous ones. Note that the allocate*WithoutRetry methods mishandle the atomicity of the exchange: The allocated count is decremented by the size of the freed collateral and then incremented by the size of the new allocation. This should be a single atomic increment of allocatedSize - collateralSize. This always succeeds if the collateral is >= the allocated amount. Even so, the window of vulnerability to another thread snatching the freed capacity before it is reacquired is probably negligible, only a few instructions. The AsyncDataCache::makeSpace loop first tries to allocate. If it fails, it evicts and keeps up to the required amount in a a grab bag allocation. It loops until this allocation is large enough to convert into the desired allocation. If nothing is evicted and the allocation repeatedly fails, it gives up. --- velox/common/caching/AsyncDataCache.cpp | 80 ++++++++++++----- velox/common/caching/AsyncDataCache.h | 31 ++++--- .../caching/tests/AsyncDataCacheTest.cpp | 85 +++++++++++++++++- velox/common/memory/Allocation.cpp | 8 ++ velox/common/memory/Allocation.h | 4 + velox/common/memory/MemoryAllocator.cpp | 87 +++++++++++++++---- velox/common/memory/MemoryAllocator.h | 14 +-- velox/common/memory/MemoryPool.cpp | 5 ++ velox/common/memory/MemoryPool.h | 7 ++ velox/common/memory/tests/AllocationTest.cpp | 26 ++++++ 10 files changed, 292 insertions(+), 55 deletions(-) diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index 19f0036f715a7..2d8015816ac2a 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -331,7 +331,11 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) { } } -void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { +void CacheShard::evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t pagesToAcquire, + memory::Allocation& acquired) { int64_t tinyFreed = 0; int64_t largeFreed = 0; int32_t evictSaveableSkipped = 0; @@ -380,7 +384,16 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { continue; } largeFreed += candidate->data_.byteSize(); - toFree.push_back(std::move(candidate->data())); + if (pagesToAcquire > 0) { + auto candidatePages = candidate->data().numPages(); + pagesToAcquire = candidatePages > pagesToAcquire + ? 0 + : pagesToAcquire - candidatePages; + acquired.appendMove(candidate->data()); + VELOX_CHECK(candidate->data().empty()); + } else { + toFree.push_back(std::move(candidate->data())); + } removeEntryLocked(candidate); emptySlots_.push_back(entryIndex); tinyFreed += candidate->tinyData_.size(); @@ -567,7 +580,7 @@ bool AsyncDataCache::exists(RawFileCacheKey key) const { bool AsyncDataCache::makeSpace( MachinePageCount numPages, - std::function allocate) { + std::function allocate) { // Try to allocate and if failed, evict the desired amount and // retry. This is without synchronization, so that other threads may // get what one thread evicted but this will usually work in a @@ -581,15 +594,28 @@ bool AsyncDataCache::makeSpace( // called from inside a global mutex. constexpr int32_t kMaxAttempts = kNumShards * 4; + // Evict at least 1MB even for small allocations to avoid constantly hitting + // the mutex protected evict loop. + constexpr int32_t kMinEvictPages = 256; // If requesting less than kSmallSizePages try up to 4x more if // first try failed. constexpr int32_t kSmallSizePages = 2048; // 8MB - int32_t sizeMultiplier = 1; + float sizeMultiplier = 1.2; // True if this thread is counted in 'numThreadsInAllocate_'. bool isCounted = false; // If more than half the allowed retries are needed, this is the rank in // arrival order of this. int32_t rank = 0; + // Allocation into which evicted pages are moved. + memory::Allocation acquired; + // 'acquired' is not managed by a pool. Make sure it is freed on throw. + // Destruct without pool and non-empty kills the process. + auto guard = folly::makeGuard([&]() { + allocator_->freeNonContiguous(acquired); + if (isCounted) { + --numThreadsInAllocate_; + } + }); VELOX_CHECK( numThreadsInAllocate_ >= 0 && numThreadsInAllocate_ < 10000, "Leak in numThreadsInAllocate_: {}", @@ -599,19 +625,12 @@ bool AsyncDataCache::makeSpace( isCounted = true; } for (auto nthAttempt = 0; nthAttempt < kMaxAttempts; ++nthAttempt) { - try { - if (allocate()) { - if (isCounted) { - --numThreadsInAllocate_; - } + if (canTryAllocate(numPages, acquired)) { + if (allocate(acquired)) { return true; } - } catch (const std::exception& e) { - if (isCounted) { - --numThreadsInAllocate_; - } - throw; } + if (nthAttempt > 2 && ssdCache_ && ssdCache_->writeInProgress()) { VELOX_SSD_CACHE_LOG(INFO) << "Pause 0.5s after failed eviction waiting for SSD cache write to unpin memory"; @@ -624,25 +643,44 @@ bool AsyncDataCache::makeSpace( } } if (rank) { + // Free the grabbed allocation before sleep so the contender can make + // progress. This is only on heavy contention, after 8 missed tries. + allocator_->freeNonContiguous(acquired); backoff(nthAttempt + rank); + // If some of the competing threads are done, maybe give this thread a + // better rank. + rank = std::min(rank, numThreadsInAllocate_); } ++shardCounter_; + int32_t numPagesToAcquire = + acquired.numPages() < numPages ? numPages - acquired.numPages() : 0; // Evict from next shard. If we have gone through all shards once // and still have not made the allocation, we go to desperate mode // with 'evictAllUnpinned' set to true. shards_[shardCounter_ & (kShardMask)]->evict( - numPages * sizeMultiplier * memory::AllocationTraits::kPageSize, - nthAttempt >= kNumShards); + memory::AllocationTraits::pageBytes( + std::max(kMinEvictPages, numPages) * sizeMultiplier), + nthAttempt >= kNumShards, + numPagesToAcquire, + acquired); if (numPages < kSmallSizePages && sizeMultiplier < 4) { sizeMultiplier *= 2; } } - if (isCounted) { - --numThreadsInAllocate_; - } return false; } +bool AsyncDataCache::canTryAllocate( + int32_t numPages, + const memory::Allocation& acquired) const { + if (numPages <= acquired.numPages()) { + return true; + } + return numPages - acquired.numPages() <= + (memory::AllocationTraits::numPages(allocator_->capacity())) - + allocator_->numAllocated(); +} + void AsyncDataCache::backoff(int32_t counter) { size_t seed = folly::hasher()(++backoffCounter_); const auto usec = (seed & 0xfff) * (counter & 0x1f); @@ -706,7 +744,9 @@ CacheStats AsyncDataCache::refreshStats() const { void AsyncDataCache::clear() { for (auto& shard : shards_) { - shard->evict(std::numeric_limits::max(), true); + memory::Allocation acquired; + shard->evict(std::numeric_limits::max(), true, 0, acquired); + VELOX_CHECK(acquired.empty()); } } diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 8c7400ce0b073..4f34306ae2ab0 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -548,8 +548,14 @@ class CacheShard { // not pinned. This favors first removing older and less frequently // used entries. If 'evictAllUnpinned' is true, anything that is // not pinned is evicted at first sight. This is for out of memory - // emergencies. - void evict(uint64_t bytesToFree, bool evictAllUnpinned); + // emergencies. If 'pagesToAcquire' is set, up to this amount is added to + // 'allocation'. A smaller amount can be added if not enough evictable data is + // found. + void evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t pagesToAcquire, + memory::Allocation& acquiredAllocation); // Removes 'entry' from 'this'. Removes a possible promise from the entry // inside the shard mutex and returns it so that it can be realized outside of @@ -659,7 +665,7 @@ class AsyncDataCache : public memory::Cache { /// for memory arbitration to work. bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) override; + std::function allocate) override; memory::MemoryAllocator* allocator() const override { return allocator_; @@ -756,6 +762,11 @@ class AsyncDataCache : public memory::Cache { static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; + // True if 'acquired' has more pages than 'numPages' or allocator has space + // for numPages - acquired pages of more allocation. + bool canTryAllocate(int32_t numPages, const memory::Allocation& acquired) + const; + static AsyncDataCache** getInstancePtr(); // Waits a pseudorandom delay times 'counter'. @@ -820,15 +831,15 @@ T percentile(Next next, int32_t numSamples, int percent) { //'offsetFunc' returns the starting offset of the data in the // file given a pin and the pin's index in 'pins'. The pins are expected to be // sorted by this offset. 'readFunc' reads from the appropriate media. It gets -// the 'pins' and the index of the first pin included in the read and the index -// of the first pin not included. It gets the starting offset of the read and a -// vector of memory ranges to fill by ReadFile::preadv or a similar +// the 'pins' and the index of the first pin included in the read and the +// index of the first pin not included. It gets the starting offset of the +// read and a vector of memory ranges to fill by ReadFile::preadv or a similar // function. -// The caller is responsible for calling setValid on the pins after a successful -// read. +// The caller is responsible for calling setValid on the pins after a +// successful read. // -// Returns the number of distinct IOs, the number of bytes loaded into pins and -// the number of extra bytes read. +// Returns the number of distinct IOs, the number of bytes loaded into pins +// and the number of extra bytes read. CoalesceIoStats readPins( const std::vector& pins, int32_t maxGap, diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 2c0c2a9e42b96..4c2714db04b19 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -18,6 +18,7 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -144,11 +145,16 @@ class AsyncDataCacheTest : public testing::Test { // loading. Stops after loading 'loadBytes' worth of entries. If // 'errorEveryNBatches' is non-0, every nth load batch will have a // bad read and wil be dropped. The entries of the failed batch read - // will still be accessed one by one. + // will still be accessed one by one. If 'largeEveryNBatches' is + // non-0, allocates and freees a single allocation of 'largeBytes' + // every so many batches. This creates extra memory pressure, as + // happens when allocating large hash tables in queries. void loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches = 0); + int32_t errorEveryNBatches = 0, + int32_t largeEveryNBatches = 0, + int32_t largeBytes = 0); // Calls func on 'numThreads' in parallel. template @@ -223,6 +229,7 @@ class AsyncDataCacheTest : public testing::Test { std::shared_ptr cache_; std::vector filenames_; std::unique_ptr executor_; + int32_t numLargeRetries_{0}; }; class TestingCoalescedLoad : public CoalescedLoad { @@ -445,11 +452,14 @@ void AsyncDataCacheTest::loadBatch( void AsyncDataCacheTest::loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches) { + int32_t errorEveryNBatches, + int32_t largeEveryNBatches, + int32_t largeAllocSize) { const int64_t maxOffset = std::max(100'000, (startOffset + loadBytes) / filenames_.size()); int64_t skippedBytes = 0; int32_t errorCounter = 0; + int32_t largeCounter = 0; std::vector batch; for (auto i = 0; i < filenames_.size(); ++i) { const auto fileNum = filenames_[i].id(); @@ -464,6 +474,27 @@ void AsyncDataCacheTest::loadLoop( batch.emplace_back(offset, size); if (batch.size() >= 8) { for (;;) { + if (largeEveryNBatches > 0 && + largeCounter++ % largeEveryNBatches == 0) { + // Many threads will allocate a single large chunk at the + // same time. Some are expected to fail. All will + // eventually succeed because whoever gets the allocation + // frees it soon and without deadlocking with others.. + memory::ContiguousAllocation large; + // Explicitly free 'large' on exit. Do not use MemoryPool for that + // because we test the allocator's limits, not the pool/memory + // manager limits. + auto guard = + folly::makeGuard([&]() { allocator_->freeContiguous(large); }); + while (!allocator_->allocateContiguous( + memory::AllocationTraits::numPages(largeAllocSize), + nullptr, + large)) { + ++numLargeRetries_; + } + std::this_thread::sleep_for( + std::chrono::microseconds(2000)); // NOLINT + } const bool injectError = (errorEveryNBatches > 0) && (++errorCounter % errorEveryNBatches == 0); loadBatch(fileNum, batch, injectError); @@ -565,6 +596,54 @@ TEST_F(AsyncDataCacheTest, replace) { cache_->incrementCachedPages(0)); } +TEST_F(AsyncDataCacheTest, evictAccounting) { + constexpr int64_t kMaxBytes = 64 << 20; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + auto memoryManager = + std::make_unique(memory::MemoryManagerOptions{ + .capacity = (int64_t)allocator_->capacity(), + .trackDefaultUsage = true, + .allocator = allocator_.get()}); + auto pool = memoryManager->addLeafPool("test"); + + // We make allocations that we exchange for larger ones later. This will evict + // cache. We check that the evictions are not counted on the pool even if they + // occur as a result of action on the pool. + memory::Allocation allocation; + memory::ContiguousAllocation large; + pool->allocateNonContiguous(1200, allocation); + pool->allocateContiguous(1200, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 2400, pool->currentBytes()); + loadLoop(0, kMaxBytes * 1.1); + pool->allocateNonContiguous(2400, allocation); + pool->allocateContiguous(2400, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 4800, pool->currentBytes()); + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); +} + +TEST_F(AsyncDataCacheTest, largeEvict) { + constexpr int64_t kMaxBytes = 256 << 20; + constexpr int32_t kNumThreads = 24; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + // Load 10x the max size, inject an allocation of 1/8 the capacity every 4 + // batches. + runThreads(kNumThreads, [&](int32_t /*i*/) { + loadLoop(0, kMaxBytes * 1.2, 0, 1, kMaxBytes / 4); + }); + if (executor_) { + executor_->join(); + } + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); + EXPECT_GE( + kMaxBytes / memory::AllocationTraits::kPageSize, + cache_->incrementCachedPages(0)); + LOG(INFO) << "Reties after failed evict: " << numLargeRetries_; +} + TEST_F(AsyncDataCacheTest, outOfCapacity) { const int64_t kMaxBytes = 64 << 20; // 64MB as MmapAllocator's min size is 64MB diff --git a/velox/common/memory/Allocation.cpp b/velox/common/memory/Allocation.cpp index b6497984234a9..f24ef76a359b9 100644 --- a/velox/common/memory/Allocation.cpp +++ b/velox/common/memory/Allocation.cpp @@ -37,6 +37,14 @@ void Allocation::append(uint8_t* address, int32_t numPages) { "Appending a duplicate address into a PageRun"); runs_.emplace_back(address, numPages); } +void Allocation::appendMove(Allocation& other) { + for (auto& run : other.runs_) { + numPages_ += run.numPages(); + runs_.push_back(std::move(run)); + } + other.runs_.clear(); + other.numPages_ = 0; +} void Allocation::findRun(uint64_t offset, int32_t* index, int32_t* offsetInRun) const { diff --git a/velox/common/memory/Allocation.h b/velox/common/memory/Allocation.h index 2669b38056369..6378378ab0a59 100644 --- a/velox/common/memory/Allocation.h +++ b/velox/common/memory/Allocation.h @@ -160,6 +160,9 @@ class Allocation { return numPages_ == 0; } + /// Moves the runs in 'from' to 'this'. 'from' is empty on return. + void appendMove(Allocation& from); + std::string toString() const; private: @@ -189,6 +192,7 @@ class Allocation { VELOX_FRIEND_TEST(MemoryAllocatorTest, allocationClass1); VELOX_FRIEND_TEST(MemoryAllocatorTest, allocationClass2); VELOX_FRIEND_TEST(AllocationTest, append); + VELOX_FRIEND_TEST(AllocationTest, appendMove); }; /// Represents a run of contiguous pages that do not belong to any size class. diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index 747a72f813861..3e292e959e4ed 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -160,6 +160,14 @@ MachinePageCount MemoryAllocator::roundUpToSizeClassSize( return *std::lower_bound(sizes.begin(), sizes.end(), pages); } +namespace { +MachinePageCount pagesToAcquire( + MachinePageCount numPages, + MachinePageCount collateralPages) { + return numPages <= collateralPages ? 0 : numPages - collateralPages; +} +} // namespace + bool MemoryAllocator::allocateNonContiguous( MachinePageCount numPages, Allocation& out, @@ -169,10 +177,26 @@ bool MemoryAllocator::allocateNonContiguous( return allocateNonContiguousWithoutRetry( numPages, out, reservationCB, minSizeClass); } - return cache()->makeSpace(numPages, [&]() { - return allocateNonContiguousWithoutRetry( - numPages, out, reservationCB, minSizeClass); - }); + bool success = cache()->makeSpace( + pagesToAcquire(numPages, out.numPages()), [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + }); + if (!success) { + // There can be a failure where allocation was never called because there + // never was a chance based on numAllocated() and capacity(). Make sure old + // data is still freed. + if (!out.empty()) { + if (out.pool()) { + out.pool()->decrementNumFrees(); // Don' count double. + auto toFree = std::move(out); + } else { + freeNonContiguous(out); + } + } + } + return success; } bool MemoryAllocator::allocateContiguous( @@ -185,10 +209,34 @@ bool MemoryAllocator::allocateContiguous( return allocateContiguousWithoutRetry( numPages, collateral, allocation, reservationCB, maxPages); } - return cache()->makeSpace(numPages, [&]() { - return allocateContiguousWithoutRetry( - numPages, collateral, allocation, reservationCB, maxPages); - }); + auto numCollateralPages = + allocation.numPages() + (collateral ? collateral->numPages() : 0); + bool success = cache()->makeSpace( + pagesToAcquire(numPages, numCollateralPages), [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + }); + if (!success) { + // never was a chance based on numAllocated() and capacity(). Make sure old + // data is still freed. + if (collateral && !collateral->empty()) { + if (collateral->pool()) { + auto toFree = std::move(*collateral); + } else { + freeNonContiguous(*collateral); + } + } + if (!allocation.empty()) { + if (allocation.pool()) { + allocation.pool()->decrementNumFrees(); // Don't count double. + auto toFree = std::move(allocation); + } else { + freeContiguous(allocation); + } + } + } + return success; } bool MemoryAllocator::growContiguous( @@ -198,7 +246,8 @@ bool MemoryAllocator::growContiguous( if (cache() == nullptr) { return growContiguousWithoutRetry(increment, allocation, reservationCB); } - return cache()->makeSpace(increment, [&]() { + return cache()->makeSpace(increment, [&](Allocation& acquired) { + freeNonContiguous(acquired); return growContiguousWithoutRetry(increment, allocation, reservationCB); }); } @@ -208,10 +257,12 @@ void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { return allocateBytesWithoutRetry(bytes, alignment); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateBytesWithoutRetry(bytes, alignment); - return result != nullptr; - }); + cache()->makeSpace( + AllocationTraits::numPages(bytes), [&](Allocation& acquired) { + freeNonContiguous(acquired); + result = allocateBytesWithoutRetry(bytes, alignment); + return result != nullptr; + }); return result; } @@ -220,10 +271,12 @@ void* MemoryAllocator::allocateZeroFilled(uint64_t bytes) { return allocateZeroFilledWithoutRetry(bytes); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateZeroFilledWithoutRetry(bytes); - return result != nullptr; - }); + cache()->makeSpace( + AllocationTraits::numPages(bytes), [&](Allocation& acquired) { + freeNonContiguous(acquired); + result = allocateZeroFilledWithoutRetry(bytes); + return result != nullptr; + }); return result; } diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 820934c366661..bba5db0bc1280 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -146,13 +146,17 @@ class MemoryAllocator; class Cache { public: virtual ~Cache() = default; - /// This method should be implemented so that it tries to accommodate the - /// passed in 'allocate' by freeing up space from 'this' if needed. 'numPages' - /// is the number of pages 'allocate' tries to allocate.It should return true - /// if 'allocate' succeeds, and false otherwise. + /// This method should be implemented so that it tries to + /// accommodate the passed in 'allocate' by freeing up space from + /// 'this' if needed. 'numPages' is the number of pages 'allocate + /// needs to be free for allocate to succeed. This should return + /// true if 'allocate' succeeds, and false otherwise. 'numPages' can + /// be less than the planned allocation, even 0 but not + /// negative. This is possible if 'allocate' brings its own memory + /// that is exchanged for the new allocation. virtual bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) = 0; + std::function allocate) = 0; virtual MemoryAllocator* allocator() const = 0; }; diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index ab2c3b026f471..55d4c6cf7e956 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -489,6 +489,11 @@ void MemoryPoolImpl::free(void* p, int64_t size) { release(alignedSize); } +void MemoryPoolImpl::decrementNumFrees() { + VELOX_CHECK_EQ(kind_, Kind::kLeaf); + --numFrees_; +} + void MemoryPoolImpl::allocateNonContiguous( MachinePageCount numPages, Allocation& out, diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index b12e0799a29c2..a0e1e80d6fa53 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -499,6 +499,11 @@ class MemoryPool : public std::enable_shared_from_this { return bits::roundUp(size, 8 * kMB); } + /// Decrements the count of free calls. Needed to adjust cases where + /// a free would be counted twice. May happen in out of capacity + /// situations with cache. + virtual void decrementNumFrees() {} + protected: static constexpr uint64_t kMB = 1 << 20; @@ -692,6 +697,8 @@ class MemoryPoolImpl : public MemoryPool { debugPoolNameRegex() = regex; } + void decrementNumFrees() override; + private: FOLLY_ALWAYS_INLINE static MemoryPoolImpl* toImpl(MemoryPool* pool) { return static_cast(pool); diff --git a/velox/common/memory/tests/AllocationTest.cpp b/velox/common/memory/tests/AllocationTest.cpp index 44b7e6fef2709..f27618303fa1c 100644 --- a/velox/common/memory/tests/AllocationTest.cpp +++ b/velox/common/memory/tests/AllocationTest.cpp @@ -59,4 +59,30 @@ TEST_F(AllocationTest, append) { VELOX_ASSERT_THROW(allocation.append(thirdBufAddr, kNumPages), ""); allocation.clear(); } + +TEST_F(AllocationTest, appendMove) { + const uint64_t startBufAddrValue = 4096; + uint8_t* const firstBufAddr = reinterpret_cast(startBufAddrValue); + const int32_t kNumPages = 10; + Allocation allocation; + allocation.append(firstBufAddr, kNumPages); + ASSERT_EQ(allocation.numPages(), kNumPages); + ASSERT_EQ(allocation.numRuns(), 1); + + Allocation otherAllocation; + uint8_t* const secondBufAddr = reinterpret_cast( + startBufAddrValue + kNumPages * AllocationTraits::kPageSize); + otherAllocation.append(secondBufAddr, kNumPages); + ASSERT_EQ(otherAllocation.numPages(), kNumPages); + + // 'allocation' gets all the runs of 'otherAllocation' and 'otherAllocation' + // is left empty. + allocation.appendMove(otherAllocation); + ASSERT_EQ(kNumPages * 2, allocation.numPages()); + ASSERT_EQ(0, otherAllocation.numPages()); + ASSERT_EQ(2, allocation.numRuns()); + ASSERT_EQ(0, otherAllocation.numRuns()); + allocation.clear(); +} + } // namespace facebook::velox::memory