From 3ffd47006783385341491b99363a34c8447ccaa6 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Tue, 3 Oct 2023 08:46:35 -0700 Subject: [PATCH] Improve allocation success when evicting from cache An allocation will evict cache to make space for data. If the the evicted data is freed and then an allocation is attempted, it can be that another thread has snatched the freed data. With high transient memory allocation rate and many threads, it can happen that a large allocation runs out of retries even if it could be satisfied. Therefore, when evicting in order to make space for an allocation, we keep hold of the non-contiguous Allocations we remove from cache. Many allocations can in this way be concatenated into an allocation that covers the needed number of pages. This can then atomically be converted into a new non-contiguous or contiguous allocation. The allocate*WithoutRetry methods accept a non-contiguous allocation to be freed to provide pages for the new allocation. Memory mapping magic can convert non-contiguous freed pages to contiguous ones. Note that the allocate*WithoutRetry methods mishandle the atomicity of the exchange: The allocated count is decremented by the size of the freed collateral and then incremented by the size of the new allocation. This should be a single atomic increment of allocatedSize - collateralSize. This always succeeds if the collateral is >= the allocated amount. Even so, the window of vulnerability to another thread snatching the freed capacity before it is reacquired is probably negligible, only a few instructions. The AsyncDataCache::makeSpace loop first tries to allocate. If it fails, it evicts and keeps up to the required amount in a a grab bag allocation. It loops until this allocation is large enough to convert into the desired allocation. If nothing is evicted and the allocation repeatedly fails, it gives up. --- velox/common/caching/AsyncDataCache.cpp | 73 +++++++++++++--- velox/common/caching/AsyncDataCache.h | 32 ++++--- .../caching/tests/AsyncDataCacheTest.cpp | 85 ++++++++++++++++++- velox/common/memory/Allocation.cpp | 8 ++ velox/common/memory/Allocation.h | 3 + velox/common/memory/MemoryAllocator.cpp | 38 ++++++--- velox/common/memory/MemoryAllocator.h | 6 +- 7 files changed, 204 insertions(+), 41 deletions(-) diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index 465aceff9fc97..b90c876cc6ddb 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -331,7 +331,11 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) { } } -void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { +void CacheShard::evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t acquirePages, + memory::Allocation* allocation) { int64_t tinyFreed = 0; int64_t largeFreed = 0; int32_t evictSaveableSkipped = 0; @@ -380,7 +384,14 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { continue; } largeFreed += candidate->data_.byteSize(); - toFree.push_back(std::move(candidate->data())); + if (acquirePages) { + auto candidatePages = candidate->data().numPages(); + acquirePages = + candidatePages > acquirePages ? 0 : acquirePages - candidatePages; + allocation->appendMove(candidate->data()); + } else { + toFree.push_back(std::move(candidate->data())); + } removeEntryLocked(candidate); emptySlots_.push_back(entryIndex); tinyFreed += candidate->tinyData_.size(); @@ -574,7 +585,8 @@ bool AsyncDataCache::exists(RawFileCacheKey key) const { bool AsyncDataCache::makeSpace( MachinePageCount numPages, - std::function allocate) { + memory::Allocation& collateral, + std::function allocate) { // Try to allocate and if failed, evict the desired amount and // retry. This is without synchronization, so that other threads may // get what one thread evicted but this will usually work in a @@ -588,15 +600,24 @@ bool AsyncDataCache::makeSpace( // called from inside a global mutex. constexpr int32_t kMaxAttempts = kNumShards * 4; + // Evict at least 1MB even for small allocations to avoid constantly hitting + // the mutex protected evict loop. + constexpr int32_t kMinEvictPages = 256; // If requesting less than kSmallSizePages try up to 4x more if // first try failed. constexpr int32_t kSmallSizePages = 2048; // 8MB - int32_t sizeMultiplier = 1; + float sizeMultiplier = 1.2; // True if this thread is counted in 'numThreadsInAllocate_'. bool isCounted = false; // If more than half the allowed retries are needed, this is the rank in // arrival order of this. int32_t rank = 0; + // Allocation into which evicted pages are moved. + memory::Allocation evicted; + // 'evicted' is not managed by a pool. Make sure it is freed on throw. + // Destruct without pool and non-empty kills the process. + auto guard = + folly::makeGuard([&]() { allocator_->freeNonContiguous(evicted); }); VELOX_CHECK( numThreadsInAllocate_ >= 0 && numThreadsInAllocate_ < 10000, "Leak in numThreadsInAllocate_: {}", @@ -606,18 +627,20 @@ bool AsyncDataCache::makeSpace( isCounted = true; } for (auto nthAttempt = 0; nthAttempt < kMaxAttempts; ++nthAttempt) { - try { - if (allocate()) { + if (canTryAllocate(numPages - collateral.numPages(), evicted)) { + try { + if (allocate(evicted)) { + if (isCounted) { + --numThreadsInAllocate_; + } + return true; + } + } catch (const std::exception& e) { if (isCounted) { --numThreadsInAllocate_; } - return true; + throw; } - } catch (const std::exception& e) { - if (isCounted) { - --numThreadsInAllocate_; - } - throw; } if (nthAttempt > 2 && ssdCache_ && ssdCache_->writeInProgress()) { VELOX_SSD_CACHE_LOG(INFO) @@ -631,15 +654,26 @@ bool AsyncDataCache::makeSpace( } } if (rank) { + // Free the grabbed allocation before sleep so the contender can make + // progress. This is only on heavy contention, after 8 missed tries. + allocator_->freeNonContiguous(evicted); backoff(nthAttempt + rank); + // If some of the competing threads are done, maybe give this thread a + // better rank. + rank = std::min(rank, numThreadsInAllocate_); } ++shardCounter_; + int32_t toAcquire = + evicted.numPages() < numPages ? numPages - evicted.numPages() : 0; // Evict from next shard. If we have gone through all shards once // and still have not made the allocation, we go to desperate mode // with 'evictAllUnpinned' set to true. shards_[shardCounter_ & (kShardMask)]->evict( - numPages * sizeMultiplier * memory::AllocationTraits::kPageSize, - nthAttempt >= kNumShards); + std::max(kMinEvictPages, numPages) * sizeMultiplier * + memory::AllocationTraits::kPageSize, + nthAttempt >= kNumShards, + toAcquire, + &evicted); if (numPages < kSmallSizePages && sizeMultiplier < 4) { sizeMultiplier *= 2; } @@ -650,6 +684,17 @@ bool AsyncDataCache::makeSpace( return false; } +bool AsyncDataCache::canTryAllocate( + int32_t numPages, + const memory::Allocation& evicted) const { + if (numPages <= evicted.numPages()) { + return true; + } + return numPages - evicted.numPages() <= + (memory::AllocationTraits::numPages(allocator_->capacity())) - + allocator_->numAllocated(); +} + void AsyncDataCache::backoff(int32_t counter) { size_t seed = folly::hasher()(++backoffCounter_); const auto usec = (seed & 0xfff) * (counter & 0x1f); diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 834552be291f8..a320352043f36 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -548,8 +548,14 @@ class CacheShard { // not pinned. This favors first removing older and less frequently // used entries. If 'evictAllUnpinned' is true, anything that is // not pinned is evicted at first sight. This is for out of memory - // emergencies. - void evict(uint64_t bytesToFree, bool evictAllUnpinned); + // emergencies. If 'acquireBytes' is set, up to this amount is added to + // 'allocation'. A smaller amount can be added if not enough evictable data is + // found. + void evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t acquirePages = 0, + memory::Allocation* allocation = nullptr); // Removes 'entry' from 'this'. Removes a possible promise from the entry // inside the shard mutex and returns it so that it can be realized outside of @@ -664,7 +670,8 @@ class AsyncDataCache : public memory::Cache { /// for memory arbitration to work. bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) override; + memory::Allocation& collateral, + std::function allocate) override; memory::MemoryAllocator* allocator() const override { return allocator_; @@ -761,6 +768,11 @@ class AsyncDataCache : public memory::Cache { static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; + // True if 'evicted' has more pages than 'numPages' or allocator has space for + // numPages - evicted pages of more allocation. + bool canTryAllocate(int32_t numPages, const memory::Allocation& evicted) + const; + static AsyncDataCache** getInstancePtr(); // Waits a pseudorandom delay times 'counter'. @@ -825,15 +837,15 @@ T percentile(Next next, int32_t numSamples, int percent) { //'offsetFunc' returns the starting offset of the data in the // file given a pin and the pin's index in 'pins'. The pins are expected to be // sorted by this offset. 'readFunc' reads from the appropriate media. It gets -// the 'pins' and the index of the first pin included in the read and the index -// of the first pin not included. It gets the starting offset of the read and a -// vector of memory ranges to fill by ReadFile::preadv or a similar +// the 'pins' and the index of the first pin included in the read and the +// index of the first pin not included. It gets the starting offset of the +// read and a vector of memory ranges to fill by ReadFile::preadv or a similar // function. -// The caller is responsible for calling setValid on the pins after a successful -// read. +// The caller is responsible for calling setValid on the pins after a +// successful read. // -// Returns the number of distinct IOs, the number of bytes loaded into pins and -// the number of extra bytes read. +// Returns the number of distinct IOs, the number of bytes loaded into pins +// and the number of extra bytes read. CoalesceIoStats readPins( const std::vector& pins, int32_t maxGap, diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 2c0c2a9e42b96..4c2714db04b19 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -18,6 +18,7 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -144,11 +145,16 @@ class AsyncDataCacheTest : public testing::Test { // loading. Stops after loading 'loadBytes' worth of entries. If // 'errorEveryNBatches' is non-0, every nth load batch will have a // bad read and wil be dropped. The entries of the failed batch read - // will still be accessed one by one. + // will still be accessed one by one. If 'largeEveryNBatches' is + // non-0, allocates and freees a single allocation of 'largeBytes' + // every so many batches. This creates extra memory pressure, as + // happens when allocating large hash tables in queries. void loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches = 0); + int32_t errorEveryNBatches = 0, + int32_t largeEveryNBatches = 0, + int32_t largeBytes = 0); // Calls func on 'numThreads' in parallel. template @@ -223,6 +229,7 @@ class AsyncDataCacheTest : public testing::Test { std::shared_ptr cache_; std::vector filenames_; std::unique_ptr executor_; + int32_t numLargeRetries_{0}; }; class TestingCoalescedLoad : public CoalescedLoad { @@ -445,11 +452,14 @@ void AsyncDataCacheTest::loadBatch( void AsyncDataCacheTest::loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches) { + int32_t errorEveryNBatches, + int32_t largeEveryNBatches, + int32_t largeAllocSize) { const int64_t maxOffset = std::max(100'000, (startOffset + loadBytes) / filenames_.size()); int64_t skippedBytes = 0; int32_t errorCounter = 0; + int32_t largeCounter = 0; std::vector batch; for (auto i = 0; i < filenames_.size(); ++i) { const auto fileNum = filenames_[i].id(); @@ -464,6 +474,27 @@ void AsyncDataCacheTest::loadLoop( batch.emplace_back(offset, size); if (batch.size() >= 8) { for (;;) { + if (largeEveryNBatches > 0 && + largeCounter++ % largeEveryNBatches == 0) { + // Many threads will allocate a single large chunk at the + // same time. Some are expected to fail. All will + // eventually succeed because whoever gets the allocation + // frees it soon and without deadlocking with others.. + memory::ContiguousAllocation large; + // Explicitly free 'large' on exit. Do not use MemoryPool for that + // because we test the allocator's limits, not the pool/memory + // manager limits. + auto guard = + folly::makeGuard([&]() { allocator_->freeContiguous(large); }); + while (!allocator_->allocateContiguous( + memory::AllocationTraits::numPages(largeAllocSize), + nullptr, + large)) { + ++numLargeRetries_; + } + std::this_thread::sleep_for( + std::chrono::microseconds(2000)); // NOLINT + } const bool injectError = (errorEveryNBatches > 0) && (++errorCounter % errorEveryNBatches == 0); loadBatch(fileNum, batch, injectError); @@ -565,6 +596,54 @@ TEST_F(AsyncDataCacheTest, replace) { cache_->incrementCachedPages(0)); } +TEST_F(AsyncDataCacheTest, evictAccounting) { + constexpr int64_t kMaxBytes = 64 << 20; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + auto memoryManager = + std::make_unique(memory::MemoryManagerOptions{ + .capacity = (int64_t)allocator_->capacity(), + .trackDefaultUsage = true, + .allocator = allocator_.get()}); + auto pool = memoryManager->addLeafPool("test"); + + // We make allocations that we exchange for larger ones later. This will evict + // cache. We check that the evictions are not counted on the pool even if they + // occur as a result of action on the pool. + memory::Allocation allocation; + memory::ContiguousAllocation large; + pool->allocateNonContiguous(1200, allocation); + pool->allocateContiguous(1200, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 2400, pool->currentBytes()); + loadLoop(0, kMaxBytes * 1.1); + pool->allocateNonContiguous(2400, allocation); + pool->allocateContiguous(2400, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 4800, pool->currentBytes()); + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); +} + +TEST_F(AsyncDataCacheTest, largeEvict) { + constexpr int64_t kMaxBytes = 256 << 20; + constexpr int32_t kNumThreads = 24; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + // Load 10x the max size, inject an allocation of 1/8 the capacity every 4 + // batches. + runThreads(kNumThreads, [&](int32_t /*i*/) { + loadLoop(0, kMaxBytes * 1.2, 0, 1, kMaxBytes / 4); + }); + if (executor_) { + executor_->join(); + } + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); + EXPECT_GE( + kMaxBytes / memory::AllocationTraits::kPageSize, + cache_->incrementCachedPages(0)); + LOG(INFO) << "Reties after failed evict: " << numLargeRetries_; +} + TEST_F(AsyncDataCacheTest, outOfCapacity) { const int64_t kMaxBytes = 64 << 20; // 64MB as MmapAllocator's min size is 64MB diff --git a/velox/common/memory/Allocation.cpp b/velox/common/memory/Allocation.cpp index b6497984234a9..f24ef76a359b9 100644 --- a/velox/common/memory/Allocation.cpp +++ b/velox/common/memory/Allocation.cpp @@ -37,6 +37,14 @@ void Allocation::append(uint8_t* address, int32_t numPages) { "Appending a duplicate address into a PageRun"); runs_.emplace_back(address, numPages); } +void Allocation::appendMove(Allocation& other) { + for (auto& run : other.runs_) { + numPages_ += run.numPages(); + runs_.push_back(std::move(run)); + } + other.runs_.clear(); + other.numPages_ = 0; +} void Allocation::findRun(uint64_t offset, int32_t* index, int32_t* offsetInRun) const { diff --git a/velox/common/memory/Allocation.h b/velox/common/memory/Allocation.h index 2669b38056369..f0631d4f418bb 100644 --- a/velox/common/memory/Allocation.h +++ b/velox/common/memory/Allocation.h @@ -160,6 +160,9 @@ class Allocation { return numPages_ == 0; } + /// Moves the runs in 'from' to 'this'. 'from' is empty on return. + void appendMove(Allocation& from); + std::string toString() const; private: diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index 747a72f813861..3d2b560598389 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -169,7 +169,8 @@ bool MemoryAllocator::allocateNonContiguous( return allocateNonContiguousWithoutRetry( numPages, out, reservationCB, minSizeClass); } - return cache()->makeSpace(numPages, [&]() { + return cache()->makeSpace(numPages, out, [&](Allocation& evicted) { + freeNonContiguous(evicted); return allocateNonContiguousWithoutRetry( numPages, out, reservationCB, minSizeClass); }); @@ -185,9 +186,14 @@ bool MemoryAllocator::allocateContiguous( return allocateContiguousWithoutRetry( numPages, collateral, allocation, reservationCB, maxPages); } - return cache()->makeSpace(numPages, [&]() { + Allocation toFree; + if (collateral) { + toFree.appendMove(*collateral); + } + return cache()->makeSpace(numPages, toFree, [&](Allocation& evicted) { + freeNonContiguous(evicted); return allocateContiguousWithoutRetry( - numPages, collateral, allocation, reservationCB, maxPages); + numPages, &toFree, allocation, reservationCB, maxPages); }); } @@ -198,7 +204,9 @@ bool MemoryAllocator::growContiguous( if (cache() == nullptr) { return growContiguousWithoutRetry(increment, allocation, reservationCB); } - return cache()->makeSpace(increment, [&]() { + Allocation empty; + return cache()->makeSpace(increment, empty, [&](Allocation& evicted) { + freeNonContiguous(evicted); return growContiguousWithoutRetry(increment, allocation, reservationCB); }); } @@ -208,10 +216,13 @@ void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { return allocateBytesWithoutRetry(bytes, alignment); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateBytesWithoutRetry(bytes, alignment); - return result != nullptr; - }); + Allocation empty; + cache()->makeSpace( + AllocationTraits::numPages(bytes), empty, [&](Allocation& evicted) { + freeNonContiguous(evicted); + result = allocateBytesWithoutRetry(bytes, alignment); + return result != nullptr; + }); return result; } @@ -220,10 +231,13 @@ void* MemoryAllocator::allocateZeroFilled(uint64_t bytes) { return allocateZeroFilledWithoutRetry(bytes); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateZeroFilledWithoutRetry(bytes); - return result != nullptr; - }); + Allocation empty; + cache()->makeSpace( + AllocationTraits::numPages(bytes), empty, [&](Allocation& evicted) { + freeNonContiguous(evicted); + result = allocateZeroFilledWithoutRetry(bytes); + return result != nullptr; + }); return result; } diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 820934c366661..b7f1b8a737663 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -149,10 +149,12 @@ class Cache { /// This method should be implemented so that it tries to accommodate the /// passed in 'allocate' by freeing up space from 'this' if needed. 'numPages' /// is the number of pages 'allocate' tries to allocate.It should return true - /// if 'allocate' succeeds, and false otherwise. + /// if 'allocate' succeeds, and false otherwise. 'collateral' is an allocation + /// that can be freed to make space for the new allocation. virtual bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) = 0; + Allocation& collateral, + std::function allocate) = 0; virtual MemoryAllocator* allocator() const = 0; };