Skip to content

Commit

Permalink
Improve allocation success when evicting from cache
Browse files Browse the repository at this point in the history
An allocation will evict cache to make space for data. If the the
evicted data is freed and then an allocation is attempted, it can be
that another thread has snatched the freed data. With high transient
memory allocation rate and many threads, it can happen that a large
allocation runs out of retries even if it could be satisfied.

Therefore, when evicting in order to make space for an allocation, we
keep hold of the non-contiguous Allocations we remove from cache. Many
allocations can in this way be concatenated into an allocation that
covers the needed number of pages. This can then atomically be
converted into a new non-contiguous or contiguous allocation. The
allocate*WithoutRetry methods accept a non-contiguous allocation to be
freed to provide pages for the new allocation. Memory mapping magic
can convert non-contiguous freed pages to contiguous ones.

Note that the allocate*WithoutRetry methods mishandle the atomicity of
the exchange: The allocated count is decremented by the size of the
freed collateral and then incremented by the size of the new
allocation. This should be a single atomic increment of allocatedSize
- collateralSize. This always succeeds if the collateral is >= the
allocated amount. Even so, the window of vulnerability to another
thread snatching the freed capacity before it is reacquired is
probably negligible, only a few instructions.

The AsyncDataCache::makeSpace loop first tries to allocate. If it fails, it evicts and keeps up to the required amount in a a grab bag allocation. It loops until this allocation is large enough to convert into the desired allocation. If nothing is evicted and the allocation repeatedly fails, it gives up.
  • Loading branch information
Orri Erling committed Oct 3, 2023
1 parent df1eb1b commit 3ffd470
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 41 deletions.
73 changes: 59 additions & 14 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,11 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) {
}
}

void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) {
void CacheShard::evict(
uint64_t bytesToFree,
bool evictAllUnpinned,
int32_t acquirePages,
memory::Allocation* allocation) {
int64_t tinyFreed = 0;
int64_t largeFreed = 0;
int32_t evictSaveableSkipped = 0;
Expand Down Expand Up @@ -380,7 +384,14 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) {
continue;
}
largeFreed += candidate->data_.byteSize();
toFree.push_back(std::move(candidate->data()));
if (acquirePages) {
auto candidatePages = candidate->data().numPages();
acquirePages =
candidatePages > acquirePages ? 0 : acquirePages - candidatePages;
allocation->appendMove(candidate->data());
} else {
toFree.push_back(std::move(candidate->data()));
}
removeEntryLocked(candidate);
emptySlots_.push_back(entryIndex);
tinyFreed += candidate->tinyData_.size();
Expand Down Expand Up @@ -574,7 +585,8 @@ bool AsyncDataCache::exists(RawFileCacheKey key) const {

bool AsyncDataCache::makeSpace(
MachinePageCount numPages,
std::function<bool()> allocate) {
memory::Allocation& collateral,
std::function<bool(memory::Allocation& allocation)> allocate) {
// Try to allocate and if failed, evict the desired amount and
// retry. This is without synchronization, so that other threads may
// get what one thread evicted but this will usually work in a
Expand All @@ -588,15 +600,24 @@ bool AsyncDataCache::makeSpace(
// called from inside a global mutex.

constexpr int32_t kMaxAttempts = kNumShards * 4;
// Evict at least 1MB even for small allocations to avoid constantly hitting
// the mutex protected evict loop.
constexpr int32_t kMinEvictPages = 256;
// If requesting less than kSmallSizePages try up to 4x more if
// first try failed.
constexpr int32_t kSmallSizePages = 2048; // 8MB
int32_t sizeMultiplier = 1;
float sizeMultiplier = 1.2;
// True if this thread is counted in 'numThreadsInAllocate_'.
bool isCounted = false;
// If more than half the allowed retries are needed, this is the rank in
// arrival order of this.
int32_t rank = 0;
// Allocation into which evicted pages are moved.
memory::Allocation evicted;
// 'evicted' is not managed by a pool. Make sure it is freed on throw.
// Destruct without pool and non-empty kills the process.
auto guard =
folly::makeGuard([&]() { allocator_->freeNonContiguous(evicted); });
VELOX_CHECK(
numThreadsInAllocate_ >= 0 && numThreadsInAllocate_ < 10000,
"Leak in numThreadsInAllocate_: {}",
Expand All @@ -606,18 +627,20 @@ bool AsyncDataCache::makeSpace(
isCounted = true;
}
for (auto nthAttempt = 0; nthAttempt < kMaxAttempts; ++nthAttempt) {
try {
if (allocate()) {
if (canTryAllocate(numPages - collateral.numPages(), evicted)) {
try {
if (allocate(evicted)) {
if (isCounted) {
--numThreadsInAllocate_;
}
return true;
}
} catch (const std::exception& e) {
if (isCounted) {
--numThreadsInAllocate_;
}
return true;
throw;
}
} catch (const std::exception& e) {
if (isCounted) {
--numThreadsInAllocate_;
}
throw;
}
if (nthAttempt > 2 && ssdCache_ && ssdCache_->writeInProgress()) {
VELOX_SSD_CACHE_LOG(INFO)
Expand All @@ -631,15 +654,26 @@ bool AsyncDataCache::makeSpace(
}
}
if (rank) {
// Free the grabbed allocation before sleep so the contender can make
// progress. This is only on heavy contention, after 8 missed tries.
allocator_->freeNonContiguous(evicted);
backoff(nthAttempt + rank);
// If some of the competing threads are done, maybe give this thread a
// better rank.
rank = std::min<int32_t>(rank, numThreadsInAllocate_);
}
++shardCounter_;
int32_t toAcquire =
evicted.numPages() < numPages ? numPages - evicted.numPages() : 0;
// Evict from next shard. If we have gone through all shards once
// and still have not made the allocation, we go to desperate mode
// with 'evictAllUnpinned' set to true.
shards_[shardCounter_ & (kShardMask)]->evict(
numPages * sizeMultiplier * memory::AllocationTraits::kPageSize,
nthAttempt >= kNumShards);
std::max<int32_t>(kMinEvictPages, numPages) * sizeMultiplier *
memory::AllocationTraits::kPageSize,
nthAttempt >= kNumShards,
toAcquire,
&evicted);
if (numPages < kSmallSizePages && sizeMultiplier < 4) {
sizeMultiplier *= 2;
}
Expand All @@ -650,6 +684,17 @@ bool AsyncDataCache::makeSpace(
return false;
}

bool AsyncDataCache::canTryAllocate(
int32_t numPages,
const memory::Allocation& evicted) const {
if (numPages <= evicted.numPages()) {
return true;
}
return numPages - evicted.numPages() <=
(memory::AllocationTraits::numPages(allocator_->capacity())) -
allocator_->numAllocated();
}

void AsyncDataCache::backoff(int32_t counter) {
size_t seed = folly::hasher<uint16_t>()(++backoffCounter_);
const auto usec = (seed & 0xfff) * (counter & 0x1f);
Expand Down
32 changes: 22 additions & 10 deletions velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,14 @@ class CacheShard {
// not pinned. This favors first removing older and less frequently
// used entries. If 'evictAllUnpinned' is true, anything that is
// not pinned is evicted at first sight. This is for out of memory
// emergencies.
void evict(uint64_t bytesToFree, bool evictAllUnpinned);
// emergencies. If 'acquireBytes' is set, up to this amount is added to
// 'allocation'. A smaller amount can be added if not enough evictable data is
// found.
void evict(
uint64_t bytesToFree,
bool evictAllUnpinned,
int32_t acquirePages = 0,
memory::Allocation* allocation = nullptr);

// Removes 'entry' from 'this'. Removes a possible promise from the entry
// inside the shard mutex and returns it so that it can be realized outside of
Expand Down Expand Up @@ -664,7 +670,8 @@ class AsyncDataCache : public memory::Cache {
/// for memory arbitration to work.
bool makeSpace(
memory::MachinePageCount numPages,
std::function<bool()> allocate) override;
memory::Allocation& collateral,
std::function<bool(memory::Allocation& allocation)> allocate) override;

memory::MemoryAllocator* allocator() const override {
return allocator_;
Expand Down Expand Up @@ -761,6 +768,11 @@ class AsyncDataCache : public memory::Cache {
static constexpr int32_t kNumShards = 4; // Must be power of 2.
static constexpr int32_t kShardMask = kNumShards - 1;

// True if 'evicted' has more pages than 'numPages' or allocator has space for
// numPages - evicted pages of more allocation.
bool canTryAllocate(int32_t numPages, const memory::Allocation& evicted)
const;

static AsyncDataCache** getInstancePtr();

// Waits a pseudorandom delay times 'counter'.
Expand Down Expand Up @@ -825,15 +837,15 @@ T percentile(Next next, int32_t numSamples, int percent) {
//'offsetFunc' returns the starting offset of the data in the
// file given a pin and the pin's index in 'pins'. The pins are expected to be
// sorted by this offset. 'readFunc' reads from the appropriate media. It gets
// the 'pins' and the index of the first pin included in the read and the index
// of the first pin not included. It gets the starting offset of the read and a
// vector of memory ranges to fill by ReadFile::preadv or a similar
// the 'pins' and the index of the first pin included in the read and the
// index of the first pin not included. It gets the starting offset of the
// read and a vector of memory ranges to fill by ReadFile::preadv or a similar
// function.
// The caller is responsible for calling setValid on the pins after a successful
// read.
// The caller is responsible for calling setValid on the pins after a
// successful read.
//
// Returns the number of distinct IOs, the number of bytes loaded into pins and
// the number of extra bytes read.
// Returns the number of distinct IOs, the number of bytes loaded into pins
// and the number of extra bytes read.
CoalesceIoStats readPins(
const std::vector<CachePin>& pins,
int32_t maxGap,
Expand Down
85 changes: 82 additions & 3 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down Expand Up @@ -144,11 +145,16 @@ class AsyncDataCacheTest : public testing::Test {
// loading. Stops after loading 'loadBytes' worth of entries. If
// 'errorEveryNBatches' is non-0, every nth load batch will have a
// bad read and wil be dropped. The entries of the failed batch read
// will still be accessed one by one.
// will still be accessed one by one. If 'largeEveryNBatches' is
// non-0, allocates and freees a single allocation of 'largeBytes'
// every so many batches. This creates extra memory pressure, as
// happens when allocating large hash tables in queries.
void loadLoop(
int64_t startOffset,
int64_t loadBytes,
int32_t errorEveryNBatches = 0);
int32_t errorEveryNBatches = 0,
int32_t largeEveryNBatches = 0,
int32_t largeBytes = 0);

// Calls func on 'numThreads' in parallel.
template <typename Func>
Expand Down Expand Up @@ -223,6 +229,7 @@ class AsyncDataCacheTest : public testing::Test {
std::shared_ptr<AsyncDataCache> cache_;
std::vector<StringIdLease> filenames_;
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
int32_t numLargeRetries_{0};
};

class TestingCoalescedLoad : public CoalescedLoad {
Expand Down Expand Up @@ -445,11 +452,14 @@ void AsyncDataCacheTest::loadBatch(
void AsyncDataCacheTest::loadLoop(
int64_t startOffset,
int64_t loadBytes,
int32_t errorEveryNBatches) {
int32_t errorEveryNBatches,
int32_t largeEveryNBatches,
int32_t largeAllocSize) {
const int64_t maxOffset =
std::max<int64_t>(100'000, (startOffset + loadBytes) / filenames_.size());
int64_t skippedBytes = 0;
int32_t errorCounter = 0;
int32_t largeCounter = 0;
std::vector<Request> batch;
for (auto i = 0; i < filenames_.size(); ++i) {
const auto fileNum = filenames_[i].id();
Expand All @@ -464,6 +474,27 @@ void AsyncDataCacheTest::loadLoop(
batch.emplace_back(offset, size);
if (batch.size() >= 8) {
for (;;) {
if (largeEveryNBatches > 0 &&
largeCounter++ % largeEveryNBatches == 0) {
// Many threads will allocate a single large chunk at the
// same time. Some are expected to fail. All will
// eventually succeed because whoever gets the allocation
// frees it soon and without deadlocking with others..
memory::ContiguousAllocation large;
// Explicitly free 'large' on exit. Do not use MemoryPool for that
// because we test the allocator's limits, not the pool/memory
// manager limits.
auto guard =
folly::makeGuard([&]() { allocator_->freeContiguous(large); });
while (!allocator_->allocateContiguous(
memory::AllocationTraits::numPages(largeAllocSize),
nullptr,
large)) {
++numLargeRetries_;
}
std::this_thread::sleep_for(
std::chrono::microseconds(2000)); // NOLINT
}
const bool injectError = (errorEveryNBatches > 0) &&
(++errorCounter % errorEveryNBatches == 0);
loadBatch(fileNum, batch, injectError);
Expand Down Expand Up @@ -565,6 +596,54 @@ TEST_F(AsyncDataCacheTest, replace) {
cache_->incrementCachedPages(0));
}

TEST_F(AsyncDataCacheTest, evictAccounting) {
constexpr int64_t kMaxBytes = 64 << 20;
FLAGS_velox_exception_user_stacktrace_enabled = false;
initializeCache(kMaxBytes);
auto memoryManager =
std::make_unique<memory::MemoryManager>(memory::MemoryManagerOptions{
.capacity = (int64_t)allocator_->capacity(),
.trackDefaultUsage = true,
.allocator = allocator_.get()});
auto pool = memoryManager->addLeafPool("test");

// We make allocations that we exchange for larger ones later. This will evict
// cache. We check that the evictions are not counted on the pool even if they
// occur as a result of action on the pool.
memory::Allocation allocation;
memory::ContiguousAllocation large;
pool->allocateNonContiguous(1200, allocation);
pool->allocateContiguous(1200, large);
EXPECT_EQ(memory::AllocationTraits::kPageSize * 2400, pool->currentBytes());
loadLoop(0, kMaxBytes * 1.1);
pool->allocateNonContiguous(2400, allocation);
pool->allocateContiguous(2400, large);
EXPECT_EQ(memory::AllocationTraits::kPageSize * 4800, pool->currentBytes());
auto stats = cache_->refreshStats();
EXPECT_LT(0, stats.numEvict);
}

TEST_F(AsyncDataCacheTest, largeEvict) {
constexpr int64_t kMaxBytes = 256 << 20;
constexpr int32_t kNumThreads = 24;
FLAGS_velox_exception_user_stacktrace_enabled = false;
initializeCache(kMaxBytes);
// Load 10x the max size, inject an allocation of 1/8 the capacity every 4
// batches.
runThreads(kNumThreads, [&](int32_t /*i*/) {
loadLoop(0, kMaxBytes * 1.2, 0, 1, kMaxBytes / 4);
});
if (executor_) {
executor_->join();
}
auto stats = cache_->refreshStats();
EXPECT_LT(0, stats.numEvict);
EXPECT_GE(
kMaxBytes / memory::AllocationTraits::kPageSize,
cache_->incrementCachedPages(0));
LOG(INFO) << "Reties after failed evict: " << numLargeRetries_;
}

TEST_F(AsyncDataCacheTest, outOfCapacity) {
const int64_t kMaxBytes = 64
<< 20; // 64MB as MmapAllocator's min size is 64MB
Expand Down
8 changes: 8 additions & 0 deletions velox/common/memory/Allocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ void Allocation::append(uint8_t* address, int32_t numPages) {
"Appending a duplicate address into a PageRun");
runs_.emplace_back(address, numPages);
}
void Allocation::appendMove(Allocation& other) {
for (auto& run : other.runs_) {
numPages_ += run.numPages();
runs_.push_back(std::move(run));
}
other.runs_.clear();
other.numPages_ = 0;
}

void Allocation::findRun(uint64_t offset, int32_t* index, int32_t* offsetInRun)
const {
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/Allocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ class Allocation {
return numPages_ == 0;
}

/// Moves the runs in 'from' to 'this'. 'from' is empty on return.
void appendMove(Allocation& from);

std::string toString() const;

private:
Expand Down
Loading

0 comments on commit 3ffd470

Please sign in to comment.