diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index 19f0036f715a..2d8015816ac2 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -331,7 +331,11 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) { } } -void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { +void CacheShard::evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t pagesToAcquire, + memory::Allocation& acquired) { int64_t tinyFreed = 0; int64_t largeFreed = 0; int32_t evictSaveableSkipped = 0; @@ -380,7 +384,16 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { continue; } largeFreed += candidate->data_.byteSize(); - toFree.push_back(std::move(candidate->data())); + if (pagesToAcquire > 0) { + auto candidatePages = candidate->data().numPages(); + pagesToAcquire = candidatePages > pagesToAcquire + ? 0 + : pagesToAcquire - candidatePages; + acquired.appendMove(candidate->data()); + VELOX_CHECK(candidate->data().empty()); + } else { + toFree.push_back(std::move(candidate->data())); + } removeEntryLocked(candidate); emptySlots_.push_back(entryIndex); tinyFreed += candidate->tinyData_.size(); @@ -567,7 +580,7 @@ bool AsyncDataCache::exists(RawFileCacheKey key) const { bool AsyncDataCache::makeSpace( MachinePageCount numPages, - std::function allocate) { + std::function allocate) { // Try to allocate and if failed, evict the desired amount and // retry. This is without synchronization, so that other threads may // get what one thread evicted but this will usually work in a @@ -581,15 +594,28 @@ bool AsyncDataCache::makeSpace( // called from inside a global mutex. constexpr int32_t kMaxAttempts = kNumShards * 4; + // Evict at least 1MB even for small allocations to avoid constantly hitting + // the mutex protected evict loop. + constexpr int32_t kMinEvictPages = 256; // If requesting less than kSmallSizePages try up to 4x more if // first try failed. constexpr int32_t kSmallSizePages = 2048; // 8MB - int32_t sizeMultiplier = 1; + float sizeMultiplier = 1.2; // True if this thread is counted in 'numThreadsInAllocate_'. bool isCounted = false; // If more than half the allowed retries are needed, this is the rank in // arrival order of this. int32_t rank = 0; + // Allocation into which evicted pages are moved. + memory::Allocation acquired; + // 'acquired' is not managed by a pool. Make sure it is freed on throw. + // Destruct without pool and non-empty kills the process. + auto guard = folly::makeGuard([&]() { + allocator_->freeNonContiguous(acquired); + if (isCounted) { + --numThreadsInAllocate_; + } + }); VELOX_CHECK( numThreadsInAllocate_ >= 0 && numThreadsInAllocate_ < 10000, "Leak in numThreadsInAllocate_: {}", @@ -599,19 +625,12 @@ bool AsyncDataCache::makeSpace( isCounted = true; } for (auto nthAttempt = 0; nthAttempt < kMaxAttempts; ++nthAttempt) { - try { - if (allocate()) { - if (isCounted) { - --numThreadsInAllocate_; - } + if (canTryAllocate(numPages, acquired)) { + if (allocate(acquired)) { return true; } - } catch (const std::exception& e) { - if (isCounted) { - --numThreadsInAllocate_; - } - throw; } + if (nthAttempt > 2 && ssdCache_ && ssdCache_->writeInProgress()) { VELOX_SSD_CACHE_LOG(INFO) << "Pause 0.5s after failed eviction waiting for SSD cache write to unpin memory"; @@ -624,25 +643,44 @@ bool AsyncDataCache::makeSpace( } } if (rank) { + // Free the grabbed allocation before sleep so the contender can make + // progress. This is only on heavy contention, after 8 missed tries. + allocator_->freeNonContiguous(acquired); backoff(nthAttempt + rank); + // If some of the competing threads are done, maybe give this thread a + // better rank. + rank = std::min(rank, numThreadsInAllocate_); } ++shardCounter_; + int32_t numPagesToAcquire = + acquired.numPages() < numPages ? numPages - acquired.numPages() : 0; // Evict from next shard. If we have gone through all shards once // and still have not made the allocation, we go to desperate mode // with 'evictAllUnpinned' set to true. shards_[shardCounter_ & (kShardMask)]->evict( - numPages * sizeMultiplier * memory::AllocationTraits::kPageSize, - nthAttempt >= kNumShards); + memory::AllocationTraits::pageBytes( + std::max(kMinEvictPages, numPages) * sizeMultiplier), + nthAttempt >= kNumShards, + numPagesToAcquire, + acquired); if (numPages < kSmallSizePages && sizeMultiplier < 4) { sizeMultiplier *= 2; } } - if (isCounted) { - --numThreadsInAllocate_; - } return false; } +bool AsyncDataCache::canTryAllocate( + int32_t numPages, + const memory::Allocation& acquired) const { + if (numPages <= acquired.numPages()) { + return true; + } + return numPages - acquired.numPages() <= + (memory::AllocationTraits::numPages(allocator_->capacity())) - + allocator_->numAllocated(); +} + void AsyncDataCache::backoff(int32_t counter) { size_t seed = folly::hasher()(++backoffCounter_); const auto usec = (seed & 0xfff) * (counter & 0x1f); @@ -706,7 +744,9 @@ CacheStats AsyncDataCache::refreshStats() const { void AsyncDataCache::clear() { for (auto& shard : shards_) { - shard->evict(std::numeric_limits::max(), true); + memory::Allocation acquired; + shard->evict(std::numeric_limits::max(), true, 0, acquired); + VELOX_CHECK(acquired.empty()); } } diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 8c7400ce0b07..4f34306ae2ab 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -548,8 +548,14 @@ class CacheShard { // not pinned. This favors first removing older and less frequently // used entries. If 'evictAllUnpinned' is true, anything that is // not pinned is evicted at first sight. This is for out of memory - // emergencies. - void evict(uint64_t bytesToFree, bool evictAllUnpinned); + // emergencies. If 'pagesToAcquire' is set, up to this amount is added to + // 'allocation'. A smaller amount can be added if not enough evictable data is + // found. + void evict( + uint64_t bytesToFree, + bool evictAllUnpinned, + int32_t pagesToAcquire, + memory::Allocation& acquiredAllocation); // Removes 'entry' from 'this'. Removes a possible promise from the entry // inside the shard mutex and returns it so that it can be realized outside of @@ -659,7 +665,7 @@ class AsyncDataCache : public memory::Cache { /// for memory arbitration to work. bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) override; + std::function allocate) override; memory::MemoryAllocator* allocator() const override { return allocator_; @@ -756,6 +762,11 @@ class AsyncDataCache : public memory::Cache { static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; + // True if 'acquired' has more pages than 'numPages' or allocator has space + // for numPages - acquired pages of more allocation. + bool canTryAllocate(int32_t numPages, const memory::Allocation& acquired) + const; + static AsyncDataCache** getInstancePtr(); // Waits a pseudorandom delay times 'counter'. @@ -820,15 +831,15 @@ T percentile(Next next, int32_t numSamples, int percent) { //'offsetFunc' returns the starting offset of the data in the // file given a pin and the pin's index in 'pins'. The pins are expected to be // sorted by this offset. 'readFunc' reads from the appropriate media. It gets -// the 'pins' and the index of the first pin included in the read and the index -// of the first pin not included. It gets the starting offset of the read and a -// vector of memory ranges to fill by ReadFile::preadv or a similar +// the 'pins' and the index of the first pin included in the read and the +// index of the first pin not included. It gets the starting offset of the +// read and a vector of memory ranges to fill by ReadFile::preadv or a similar // function. -// The caller is responsible for calling setValid on the pins after a successful -// read. +// The caller is responsible for calling setValid on the pins after a +// successful read. // -// Returns the number of distinct IOs, the number of bytes loaded into pins and -// the number of extra bytes read. +// Returns the number of distinct IOs, the number of bytes loaded into pins +// and the number of extra bytes read. CoalesceIoStats readPins( const std::vector& pins, int32_t maxGap, diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 2c0c2a9e42b9..4c2714db04b1 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -18,6 +18,7 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -144,11 +145,16 @@ class AsyncDataCacheTest : public testing::Test { // loading. Stops after loading 'loadBytes' worth of entries. If // 'errorEveryNBatches' is non-0, every nth load batch will have a // bad read and wil be dropped. The entries of the failed batch read - // will still be accessed one by one. + // will still be accessed one by one. If 'largeEveryNBatches' is + // non-0, allocates and freees a single allocation of 'largeBytes' + // every so many batches. This creates extra memory pressure, as + // happens when allocating large hash tables in queries. void loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches = 0); + int32_t errorEveryNBatches = 0, + int32_t largeEveryNBatches = 0, + int32_t largeBytes = 0); // Calls func on 'numThreads' in parallel. template @@ -223,6 +229,7 @@ class AsyncDataCacheTest : public testing::Test { std::shared_ptr cache_; std::vector filenames_; std::unique_ptr executor_; + int32_t numLargeRetries_{0}; }; class TestingCoalescedLoad : public CoalescedLoad { @@ -445,11 +452,14 @@ void AsyncDataCacheTest::loadBatch( void AsyncDataCacheTest::loadLoop( int64_t startOffset, int64_t loadBytes, - int32_t errorEveryNBatches) { + int32_t errorEveryNBatches, + int32_t largeEveryNBatches, + int32_t largeAllocSize) { const int64_t maxOffset = std::max(100'000, (startOffset + loadBytes) / filenames_.size()); int64_t skippedBytes = 0; int32_t errorCounter = 0; + int32_t largeCounter = 0; std::vector batch; for (auto i = 0; i < filenames_.size(); ++i) { const auto fileNum = filenames_[i].id(); @@ -464,6 +474,27 @@ void AsyncDataCacheTest::loadLoop( batch.emplace_back(offset, size); if (batch.size() >= 8) { for (;;) { + if (largeEveryNBatches > 0 && + largeCounter++ % largeEveryNBatches == 0) { + // Many threads will allocate a single large chunk at the + // same time. Some are expected to fail. All will + // eventually succeed because whoever gets the allocation + // frees it soon and without deadlocking with others.. + memory::ContiguousAllocation large; + // Explicitly free 'large' on exit. Do not use MemoryPool for that + // because we test the allocator's limits, not the pool/memory + // manager limits. + auto guard = + folly::makeGuard([&]() { allocator_->freeContiguous(large); }); + while (!allocator_->allocateContiguous( + memory::AllocationTraits::numPages(largeAllocSize), + nullptr, + large)) { + ++numLargeRetries_; + } + std::this_thread::sleep_for( + std::chrono::microseconds(2000)); // NOLINT + } const bool injectError = (errorEveryNBatches > 0) && (++errorCounter % errorEveryNBatches == 0); loadBatch(fileNum, batch, injectError); @@ -565,6 +596,54 @@ TEST_F(AsyncDataCacheTest, replace) { cache_->incrementCachedPages(0)); } +TEST_F(AsyncDataCacheTest, evictAccounting) { + constexpr int64_t kMaxBytes = 64 << 20; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + auto memoryManager = + std::make_unique(memory::MemoryManagerOptions{ + .capacity = (int64_t)allocator_->capacity(), + .trackDefaultUsage = true, + .allocator = allocator_.get()}); + auto pool = memoryManager->addLeafPool("test"); + + // We make allocations that we exchange for larger ones later. This will evict + // cache. We check that the evictions are not counted on the pool even if they + // occur as a result of action on the pool. + memory::Allocation allocation; + memory::ContiguousAllocation large; + pool->allocateNonContiguous(1200, allocation); + pool->allocateContiguous(1200, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 2400, pool->currentBytes()); + loadLoop(0, kMaxBytes * 1.1); + pool->allocateNonContiguous(2400, allocation); + pool->allocateContiguous(2400, large); + EXPECT_EQ(memory::AllocationTraits::kPageSize * 4800, pool->currentBytes()); + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); +} + +TEST_F(AsyncDataCacheTest, largeEvict) { + constexpr int64_t kMaxBytes = 256 << 20; + constexpr int32_t kNumThreads = 24; + FLAGS_velox_exception_user_stacktrace_enabled = false; + initializeCache(kMaxBytes); + // Load 10x the max size, inject an allocation of 1/8 the capacity every 4 + // batches. + runThreads(kNumThreads, [&](int32_t /*i*/) { + loadLoop(0, kMaxBytes * 1.2, 0, 1, kMaxBytes / 4); + }); + if (executor_) { + executor_->join(); + } + auto stats = cache_->refreshStats(); + EXPECT_LT(0, stats.numEvict); + EXPECT_GE( + kMaxBytes / memory::AllocationTraits::kPageSize, + cache_->incrementCachedPages(0)); + LOG(INFO) << "Reties after failed evict: " << numLargeRetries_; +} + TEST_F(AsyncDataCacheTest, outOfCapacity) { const int64_t kMaxBytes = 64 << 20; // 64MB as MmapAllocator's min size is 64MB diff --git a/velox/common/memory/Allocation.cpp b/velox/common/memory/Allocation.cpp index b6497984234a..f24ef76a359b 100644 --- a/velox/common/memory/Allocation.cpp +++ b/velox/common/memory/Allocation.cpp @@ -37,6 +37,14 @@ void Allocation::append(uint8_t* address, int32_t numPages) { "Appending a duplicate address into a PageRun"); runs_.emplace_back(address, numPages); } +void Allocation::appendMove(Allocation& other) { + for (auto& run : other.runs_) { + numPages_ += run.numPages(); + runs_.push_back(std::move(run)); + } + other.runs_.clear(); + other.numPages_ = 0; +} void Allocation::findRun(uint64_t offset, int32_t* index, int32_t* offsetInRun) const { diff --git a/velox/common/memory/Allocation.h b/velox/common/memory/Allocation.h index 2669b3805636..6378378ab0a5 100644 --- a/velox/common/memory/Allocation.h +++ b/velox/common/memory/Allocation.h @@ -160,6 +160,9 @@ class Allocation { return numPages_ == 0; } + /// Moves the runs in 'from' to 'this'. 'from' is empty on return. + void appendMove(Allocation& from); + std::string toString() const; private: @@ -189,6 +192,7 @@ class Allocation { VELOX_FRIEND_TEST(MemoryAllocatorTest, allocationClass1); VELOX_FRIEND_TEST(MemoryAllocatorTest, allocationClass2); VELOX_FRIEND_TEST(AllocationTest, append); + VELOX_FRIEND_TEST(AllocationTest, appendMove); }; /// Represents a run of contiguous pages that do not belong to any size class. diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index 747a72f81386..f2da6e71f081 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -160,6 +160,14 @@ MachinePageCount MemoryAllocator::roundUpToSizeClassSize( return *std::lower_bound(sizes.begin(), sizes.end(), pages); } +namespace { +MachinePageCount pagesToAcquire( + MachinePageCount numPages, + MachinePageCount collateralPages) { + return numPages <= collateralPages ? 0 : numPages - collateralPages; +} +} // namespace + bool MemoryAllocator::allocateNonContiguous( MachinePageCount numPages, Allocation& out, @@ -169,10 +177,24 @@ bool MemoryAllocator::allocateNonContiguous( return allocateNonContiguousWithoutRetry( numPages, out, reservationCB, minSizeClass); } - return cache()->makeSpace(numPages, [&]() { - return allocateNonContiguousWithoutRetry( - numPages, out, reservationCB, minSizeClass); - }); + bool success = cache()->makeSpace( + pagesToAcquire(numPages, out.numPages()), [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + }); + if (!success) { + // There can be a failure where allocation was never called because there + // never was a chance based on numAllocated() and capacity(). Make sure old + // data is still freed. + if (!out.empty()) { + if (reservationCB) { + reservationCB(AllocationTraits::pageBytes(out.numPages()), false); + } + freeNonContiguous(out); + } + } + return success; } bool MemoryAllocator::allocateContiguous( @@ -185,10 +207,36 @@ bool MemoryAllocator::allocateContiguous( return allocateContiguousWithoutRetry( numPages, collateral, allocation, reservationCB, maxPages); } - return cache()->makeSpace(numPages, [&]() { - return allocateContiguousWithoutRetry( - numPages, collateral, allocation, reservationCB, maxPages); - }); + auto numCollateralPages = + allocation.numPages() + (collateral ? collateral->numPages() : 0); + bool success = cache()->makeSpace( + pagesToAcquire(numPages, numCollateralPages), [&](Allocation& acquired) { + freeNonContiguous(acquired); + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + }); + if (!success) { + // never was a chance based on numAllocated() and capacity(). Make sure old + // data is still freed. + int64_t bytes = 0; + ; + if (collateral && !collateral->empty()) { + if (reservationCB) { + bytes += AllocationTraits::pageBytes(collateral->numPages()); + } + freeNonContiguous(*collateral); + } + if (!allocation.empty()) { + if (reservationCB) { + bytes += allocation.size(); + } + freeContiguous(allocation); + } + if (bytes) { + reservationCB(bytes, false); + } + } + return success; } bool MemoryAllocator::growContiguous( @@ -198,7 +246,8 @@ bool MemoryAllocator::growContiguous( if (cache() == nullptr) { return growContiguousWithoutRetry(increment, allocation, reservationCB); } - return cache()->makeSpace(increment, [&]() { + return cache()->makeSpace(increment, [&](Allocation& acquired) { + freeNonContiguous(acquired); return growContiguousWithoutRetry(increment, allocation, reservationCB); }); } @@ -208,10 +257,12 @@ void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { return allocateBytesWithoutRetry(bytes, alignment); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateBytesWithoutRetry(bytes, alignment); - return result != nullptr; - }); + cache()->makeSpace( + AllocationTraits::numPages(bytes), [&](Allocation& acquired) { + freeNonContiguous(acquired); + result = allocateBytesWithoutRetry(bytes, alignment); + return result != nullptr; + }); return result; } @@ -220,10 +271,12 @@ void* MemoryAllocator::allocateZeroFilled(uint64_t bytes) { return allocateZeroFilledWithoutRetry(bytes); } void* result = nullptr; - cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { - result = allocateZeroFilledWithoutRetry(bytes); - return result != nullptr; - }); + cache()->makeSpace( + AllocationTraits::numPages(bytes), [&](Allocation& acquired) { + freeNonContiguous(acquired); + result = allocateZeroFilledWithoutRetry(bytes); + return result != nullptr; + }); return result; } diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 820934c36666..bba5db0bc128 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -146,13 +146,17 @@ class MemoryAllocator; class Cache { public: virtual ~Cache() = default; - /// This method should be implemented so that it tries to accommodate the - /// passed in 'allocate' by freeing up space from 'this' if needed. 'numPages' - /// is the number of pages 'allocate' tries to allocate.It should return true - /// if 'allocate' succeeds, and false otherwise. + /// This method should be implemented so that it tries to + /// accommodate the passed in 'allocate' by freeing up space from + /// 'this' if needed. 'numPages' is the number of pages 'allocate + /// needs to be free for allocate to succeed. This should return + /// true if 'allocate' succeeds, and false otherwise. 'numPages' can + /// be less than the planned allocation, even 0 but not + /// negative. This is possible if 'allocate' brings its own memory + /// that is exchanged for the new allocation. virtual bool makeSpace( memory::MachinePageCount numPages, - std::function allocate) = 0; + std::function allocate) = 0; virtual MemoryAllocator* allocator() const = 0; }; diff --git a/velox/common/memory/tests/AllocationTest.cpp b/velox/common/memory/tests/AllocationTest.cpp index 44b7e6fef270..f27618303fa1 100644 --- a/velox/common/memory/tests/AllocationTest.cpp +++ b/velox/common/memory/tests/AllocationTest.cpp @@ -59,4 +59,30 @@ TEST_F(AllocationTest, append) { VELOX_ASSERT_THROW(allocation.append(thirdBufAddr, kNumPages), ""); allocation.clear(); } + +TEST_F(AllocationTest, appendMove) { + const uint64_t startBufAddrValue = 4096; + uint8_t* const firstBufAddr = reinterpret_cast(startBufAddrValue); + const int32_t kNumPages = 10; + Allocation allocation; + allocation.append(firstBufAddr, kNumPages); + ASSERT_EQ(allocation.numPages(), kNumPages); + ASSERT_EQ(allocation.numRuns(), 1); + + Allocation otherAllocation; + uint8_t* const secondBufAddr = reinterpret_cast( + startBufAddrValue + kNumPages * AllocationTraits::kPageSize); + otherAllocation.append(secondBufAddr, kNumPages); + ASSERT_EQ(otherAllocation.numPages(), kNumPages); + + // 'allocation' gets all the runs of 'otherAllocation' and 'otherAllocation' + // is left empty. + allocation.appendMove(otherAllocation); + ASSERT_EQ(kNumPages * 2, allocation.numPages()); + ASSERT_EQ(0, otherAllocation.numPages()); + ASSERT_EQ(2, allocation.numRuns()); + ASSERT_EQ(0, otherAllocation.numRuns()); + allocation.clear(); +} + } // namespace facebook::velox::memory