diff --git a/velox/common/caching/CMakeLists.txt b/velox/common/caching/CMakeLists.txt index 4917957126ae..9a1d91f1f0f4 100644 --- a/velox/common/caching/CMakeLists.txt +++ b/velox/common/caching/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries( velox_file velox_memory velox_process + velox_time Folly::folly fmt::fmt gflags::gflags diff --git a/velox/common/caching/CachedFactory.h b/velox/common/caching/CachedFactory.h index 70dddc4895ff..e0587ef04aef 100644 --- a/velox/common/caching/CachedFactory.h +++ b/velox/common/caching/CachedFactory.h @@ -31,6 +31,7 @@ #include #include +#include #include #include "folly/container/F14Set.h" @@ -39,93 +40,219 @@ namespace facebook::velox { -// CachedFactory provides a thread-safe way of backing a keyed generator -// (e.g. the key is filename, and the value is the file data) by a cache. -// -// Generator should take a single Key argument and return a Value; -// The Value should be either a value type or should manage its own lifecycle -// (shared_ptr). If it is not thread-safe it must do its own internal locking. -template +/// A smart pointer that represents data that may be in a cache and is thus not +/// owned, or is owned like a unique_ptr. We could also implement this by a +/// unique_ptr with a custom deleter. +template < + typename Key, + typename Value, + typename Comparator = std::equal_to, + typename Hash = std::hash> +class CachedPtr { + public: + /// Nullptr case. + CachedPtr(); + + /// Data is not in cache, ownership taken by *this. + explicit CachedPtr(Value* value); + + /// Data is in the provided cache referenced by the given key. The cache is + /// not guarded by a mutex. + CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key); + + /// Same as above, but the cache is guarded by a mutex. + CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key, + std::mutex* cacheMu); + + /// The destructor handles the in-cache and non-in-cache cases appropriately. + ~CachedPtr(); + + /// Move allowed, copy disallowed. Moving a new value into a non-null + /// CachedPtr will clear the previous value. + CachedPtr(CachedPtr&&); + CachedPtr& operator=(CachedPtr&&); + CachedPtr(const CachedPtr&) = delete; + CachedPtr& operator=(const CachedPtr&) = delete; + + /// Whether this value is load from cache. If we had to wait for a generation + /// (whether the actual generation was done in this thread or another) then + /// this is false. Has no effect on this behavior, but may be useful for + /// monitoring cache hit rates/etc. + bool fromCache() const { + return fromCache_; + } + + /// Indicates if this value is cached or not. + bool cached() const { + return cache_ != nullptr; + } + + Value* operator->() const { + return value_; + } + Value& operator*() const { + return *value_; + } + Value* get() const { + return value_; + } + + void testingClear() { + clear(); + key_.reset(); + value_ = nullptr; + cache_ = nullptr; + cacheMu_ = nullptr; + } + + private: + // Delete or release owned value. + void clear(); + + bool fromCache_; + std::unique_ptr key_; + Value* value_; + std::mutex* cacheMu_; + // If 'value_' is in cache, 'cache_' and 'key_' will be non-null, and + // 'cacheMu_' may be non-null. If cacheMu_ is non-null, we use it to protect + // our operations to 'cache_'. + SimpleLRUCache* cache_; +}; + +template +struct DefaultSizer { + int64_t operator()(const Value& value) const { + return 1; + } +}; + +/// CachedFactory provides a thread-safe way of backing a keyed generator +/// (e.g. the key is filename, and the value is the file data) by a cache. +/// +/// Generator should take a single Key argument and return a unique_ptr; +/// If it is not thread-safe it must do its own internal locking. +/// Sizer takes a Value and returns how much cache space it will occupy. The +/// DefaultSizer says each value occupies 1 space. +template < + typename Key, + typename Value, + typename Generator, + typename Sizer = DefaultSizer, + typename Comparator = std::equal_to, + typename Hash = std::hash> class CachedFactory { public: - // It is generally expected that most inserts into the cache will succeed, - // i.e. the cache is large compared to the size of the elements and the number - // of elements that are pinned. Everything should still work if this is not - // true, but performance will suffer. - // If 'cache' is nullptr, this means the cache is disabled. 'generator' is - // invoked directly in 'generate' function. + /// It is generally expected that most inserts into the cache will succeed, + /// i.e. the cache is large compared to the size of the elements and the + /// number of elements that are pinned. Everything should still work if this + /// is not true, but performance will suffer. If 'cache' is nullptr, this + /// means the cache is disabled. 'generator' is invoked directly in 'generate' + /// function. CachedFactory( - std::unique_ptr> cache, + std::unique_ptr> cache, std::unique_ptr generator) - : cache_(std::move(cache)), generator_(std::move(generator)) {} - - // Returns the generator's output on the given key. If the output is - // in the cache, returns immediately. Otherwise, blocks until the output - // is ready. - // The function returns a pair. The boolean in the pair indicates whether a - // cache hit or miss. The Value is the generator output for the key if cache - // miss, or Value in the cache if cache hit. - std::pair generate(const Key& key); - - // Advanced function taking in a group of keys. Separates those keys into - // one's present in the cache (returning CachedPtrs for them) and those not - // in the cache. Does NOT call the Generator for any key. + : generator_(std::move(generator)), cache_(std::move(cache)) {} + + CachedFactory(std::unique_ptr generator) + : CachedFactory(nullptr, std::move(generator)) {} + + /// Returns the generator's output on the given key. If the output is in the + /// cache, returns immediately. Otherwise, blocks until the output is ready. + /// For a given key we will only ever be running the Generator function once. + /// E.g., if N threads ask for the same key at once, the generator will be + /// fired once and all N will receive a pointer from the cache. + /// + /// Actually the last sentence is not quite true in the edge case where + /// inserts into the cache fail; in that case we will re-run the generator + /// repeatedly, handing off the results to one thread at a time until the + /// all pending requests are satisfied or a cache insert succeeds. This + /// will probably mess with your memory model, so really try to avoid it. + CachedPtr generate(const Key& key); + + /// Advanced function taking in a group of keys. Separates those keys into + /// one's present in the cache (returning CachedPtrs for them) and those not + /// in the cache. Does NOT call the Generator for any key. void retrieveCached( const std::vector& keys, - std::vector>* cached, - std::vector* missing); + std::vector>>& + cached, + std::vector& missing); - // Total size of elements cached (NOT the maximum size/limit). + /// Total size of elements cached (NOT the maximum size/limit). int64_t currentSize() const { - if (cache_) { - return cache_->currentSize(); - } else { + if (cache_ == nullptr) { return 0; } + return cache_->currentSize(); } - // The maximum size of the underlying cache. + /// The maximum size of the underlying cache. int64_t maxSize() const { - if (cache_) { - return cache_->maxSize(); - } else { + if (cache_ == nullptr) { return 0; } + return cache_->maxSize(); } SimpleLRUCacheStats cacheStats() { - if (cache_) { - std::lock_guard l(cacheMu_); - return cache_->getStats(); - } else { - return {0, 0, 0, 0}; + if (cache_ == nullptr) { + return {}; } + std::lock_guard l(cacheMu_); + return cache_->stats(); } // Clear the cache and return the current cache status SimpleLRUCacheStats clearCache() { - if (cache_) { - std::lock_guard l(cacheMu_); - cache_->clear(); - return cache_->getStats(); - } else { - return {0, 0, 0, 0}; + if (cache_ == nullptr) { + return {}; } + std::lock_guard l(cacheMu_); + cache_->free(cache_->maxSize()); + return cache_->stats(); } - // Move allowed, copy disallowed. + /// Move allowed, copy disallowed. CachedFactory(CachedFactory&&) = default; CachedFactory& operator=(CachedFactory&&) = default; CachedFactory(const CachedFactory&) = delete; CachedFactory& operator=(const CachedFactory&) = delete; private: - std::unique_ptr> cache_; + void removePending(const Key& key) { + std::lock_guard pendingLock(pendingMu_); + pending_.erase(key); + } + + bool addCache(const Key& key, Value* value, int64_t size) { + std::lock_guard cacheLock(cacheMu_); + return cache_->addPinned(key, value, size); + } + + Value* getCache(const Key& key) { + std::lock_guard cacheLock(cacheMu_); + return getCacheLocked(key); + } + + Value* getCacheLocked(const Key& key) { + return cache_->get(key); + } + std::unique_ptr generator_; - folly::F14FastSet pending_; std::mutex cacheMu_; + std::unique_ptr> cache_; + std::mutex pendingMu_; + folly::F14FastSet pending_; std::condition_variable pendingCv_; }; @@ -133,84 +260,198 @@ class CachedFactory { // End of public API. Implementation follows. // -template -std::pair CachedFactory::generate( +template +CachedPtr::CachedPtr() + : fromCache_(false), + key_(nullptr), + value_(nullptr), + cacheMu_(nullptr), + cache_(nullptr) {} + +template +CachedPtr::CachedPtr(Value* value) + : fromCache_(false), + key_(nullptr), + value_(value), + cacheMu_(nullptr), + cache_(nullptr) {} + +template +CachedPtr::CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key) + : fromCache_(cached), + key_(std::move(key)), + value_(value), + cacheMu_(nullptr), + cache_(cache) {} + +template +CachedPtr::CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key, + std::mutex* cacheMu) + : fromCache_(cached), + key_(std::move(key)), + value_(value), + cacheMu_(cacheMu), + cache_(cache) {} + +template +CachedPtr::~CachedPtr() { + clear(); +} + +template +CachedPtr::CachedPtr(CachedPtr&& other) { + fromCache_ = other.fromCache_; + value_ = other.value_; + key_ = std::move(other.key_); + cache_ = other.cache_; + cacheMu_ = other.cacheMu_; + other.value_ = nullptr; +} + +template +CachedPtr& +CachedPtr::operator=(CachedPtr&& other) { + clear(); + fromCache_ = other.fromCache_; + value_ = other.value_; + key_ = std::move(other.key_); + cache_ = other.cache_; + cacheMu_ = other.cacheMu_; + other.value_ = nullptr; + return *this; +} + +template +void CachedPtr::clear() { + if (value_ == nullptr) { + return; + } + if (cache_ == nullptr) { + delete value_; + return; + } + if (cacheMu_ != nullptr) { + std::lock_guard l(*cacheMu_); + cache_->release(*key_); + } else { + cache_->release(*key_); + } +} + +template < + typename Key, + typename Value, + typename Generator, + typename Sizer, + typename Comparator, + typename Hash> +CachedPtr +CachedFactory::generate( const Key& key) { process::TraceContext trace("CachedFactory::generate"); - if (!cache_) { - return std::make_pair(false, (*generator_)(key)); + if (cache_ == nullptr) { + return CachedPtr{ + /*fromCache=*/false, + (*generator_)(key).release(), + nullptr, + std::make_unique(key)}; } - std::unique_lock pending_lock(pendingMu_); + std::unique_lock pendingLock(pendingMu_); { - std::lock_guard cache_lock(cacheMu_); - auto value = cache_->get(key); - if (value) { - return std::make_pair(true, value.value()); + if (Value* value = getCache(key)) { + return CachedPtr( + /*fromCache=*/true, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_); } } - if (pending_.contains(key)) { - pendingCv_.wait(pending_lock, [&]() { return !pending_.contains(key); }); + pendingCv_.wait(pendingLock, [&]() { return !pending_.contains(key); }); // Will normally hit the cache now. - { - std::lock_guard cache_lock(cacheMu_); - auto value = cache_->get(key); - if (value) { - return std::make_pair(true, value.value()); - } - } - pending_lock.unlock(); - return generate(key); // Regenerate in the edge case. - } else { - pending_.insert(key); - pending_lock.unlock(); - Value generatedValue; - // TODO: consider using folly/ScopeGuard here. - try { - generatedValue = (*generator_)(key); - } catch (const std::exception&) { - { - std::lock_guard pending_lock_2(pendingMu_); - pending_.erase(key); - } - pendingCv_.notify_all(); - throw; - } - cacheMu_.lock(); - cache_->add(key, generatedValue); - cacheMu_.unlock(); - - // TODO: this code is exception unsafe and can leave pending_ in an - // inconsistent state. Eventually this code should move to - // folly:synchronized and rewritten with better primitives. - { - std::lock_guard pending_lock_2(pendingMu_); - pending_.erase(key); + if (Value* value = getCache(key)) { + return CachedPtr( + /*fromCache=*/false, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_); } + pendingLock.unlock(); + // Regenerates in the edge case. + return generate(key); + } + + pending_.insert(key); + pendingLock.unlock(); + + SCOPE_EXIT { + removePending(key); pendingCv_.notify_all(); - return std::make_pair(false, generatedValue); + }; + + std::unique_ptr generatedValue = (*generator_)(key); + const uint64_t valueSize = Sizer()(*generatedValue); + Value* rawValue = generatedValue.release(); + const bool inserted = addCache(key, rawValue, valueSize); + + CachedPtr result; + if (inserted) { + result = CachedPtr( + /*fromCache=*/false, + rawValue, + cache_.get(), + std::make_unique(key), + &cacheMu_); + } else { + FB_LOG_EVERY_MS(WARNING, 60'000) << "Unable to insert into cache!"; + result = CachedPtr(rawValue); } + return result; } -template -void CachedFactory::retrieveCached( - const std::vector& keys, - std::vector>* cached, - std::vector* missing) { - if (cache_) { - std::lock_guard cache_lock(cacheMu_); - for (const Key& key : keys) { - auto value = cache_->get(key); - if (value) { - cached->emplace_back(key, value.value()); - } else { - missing->push_back(key); - } - } - } else { - for (const Key& key : keys) { - missing->push_back(key); +template < + typename Key, + typename Value, + typename Generator, + typename Sizer, + typename Comparator, + typename Hash> +void CachedFactory:: + retrieveCached( + const std::vector& keys, + std::vector>>& + cached, + std::vector& missing) { + if (cache_ == nullptr) { + missing.insert(missing.end(), keys.begin(), keys.end()); + return; + } + + std::lock_guard l(cacheMu_); + for (const Key& key : keys) { + Value* value = getCacheLocked(key); + if (value != nullptr) { + cached.emplace_back( + key, + CachedPtr( + /*fromCache=*/true, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_)); + } else { + missing.push_back(key); } } } diff --git a/velox/common/caching/SimpleLRUCache.h b/velox/common/caching/SimpleLRUCache.h index cd420995f641..5812a30c99a9 100644 --- a/velox/common/caching/SimpleLRUCache.h +++ b/velox/common/caching/SimpleLRUCache.h @@ -18,147 +18,379 @@ #include #include #include -#include #include -#include "folly/container/EvictingCacheMap.h" +#include "folly/IntrusiveList.h" +#include "folly/container/F14Map.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/time/Timer.h" namespace facebook::velox { struct SimpleLRUCacheStats { SimpleLRUCacheStats( size_t _maxSize, + size_t _expireDurationMs, size_t _curSize, + size_t _pinnedSize, + size_t _numElements, size_t _numHits, size_t _numLookups) : maxSize{_maxSize}, + expireDurationMs(_expireDurationMs), curSize{_curSize}, + pinnedSize{_pinnedSize}, + numElements{_numElements}, numHits{_numHits}, - numLookups{_numLookups}, - numElements{curSize}, - pinnedSize{curSize} {} + numLookups{_numLookups} {} - // Capacity of the cache. - const size_t maxSize; + SimpleLRUCacheStats() = default; - // Current cache size used. - const size_t curSize; + /// Capacity of the cache. + size_t maxSize{0}; - // Total number of cache hits since server start. - const size_t numHits; + size_t expireDurationMs{0}; - // Total number of cache lookups since server start. - const size_t numLookups; + /// Current cache size used. + size_t curSize{0}; - // TODO: These 2 are unused, but open source Presto depends on them - // Remove the usage in open source presto and get rid of them. - const size_t numElements; - const size_t pinnedSize; + /// Current cache size used by pinned entries. + size_t pinnedSize{0}; + + /// Total number of elements in the cache. + size_t numElements{0}; + + /// Total number of cache hits since server start. + size_t numHits{0}; + + /// Total number of cache lookups since server start. + size_t numLookups{0}; std::string toString() const { return fmt::format( "{{\n" " maxSize: {}\n" + " expireDurationMs: {}\n" " curSize: {}\n" + " pinnedSize: {}\n" + " numElements: {}\n" " numHits: {}\n" " numLookups: {}\n" "}}\n", maxSize, + expireDurationMs, curSize, + pinnedSize, + numElements, numHits, numLookups); } - bool operator==(const SimpleLRUCacheStats& rhs) const { - return std::tie(curSize, maxSize, numHits, numLookups) == - std::tie(rhs.curSize, rhs.maxSize, rhs.numHits, rhs.numLookups); + bool operator==(const SimpleLRUCacheStats& other) const { + return std::tie( + curSize, + expireDurationMs, + maxSize, + pinnedSize, + numElements, + numHits, + numLookups) == + std::tie( + other.curSize, + other.expireDurationMs, + other.maxSize, + other.pinnedSize, + other.numElements, + other.numHits, + other.numLookups); } }; -/// A simple wrapper on top of the folly::EvictingCacheMap that tracks -/// hit/miss counters. Key/Value evicted are immediately destructed. -/// So the Key/Value should be a value type or self managing lifecycle -/// shared_ptr. +/// A simple LRU cache that allows each element to occupy an arbitrary amount of +/// space in the cache. Useful when the size of the cached elements can vary a +/// lot; if they are all roughly the same size something that only tracks the +/// number of elements in the cache like common/datastruct/LRUCacheMap.h may be +/// better. /// /// NOTE: -/// 1. NOT Thread-Safe: All the public calls modify internal structures -/// and hence require external write locks if used from multiple threads. -template +/// 1. NOT Thread-Safe: All the public calls modify internal structures and +/// hence require external write locks if used from multiple threads. +/// 2. 'Key' is required to be copyable and movable. +template < + typename Key, + typename Value, + typename Comparator = std::equal_to, + typename Hash = std::hash> class SimpleLRUCache { public: - /// Constructs a cache of the specified size. The maxSize represents the - /// number of entries in the cache. clearSize represents the number of entries - /// to evict in a given time, when the cache is full. - explicit SimpleLRUCache(size_t maxSize, size_t clearSize = 1); + /// Constructs a cache of the specified size. This size can represent whatever + /// you want -- slots, or bytes, or etc; you provide the size of each element + /// whenever you add a new value to the cache. If 'expireDurationMs' is not + /// zero, then a cache value will be evicted out of cache after + /// 'expireDurationMs' time passed since its insertion into the cache no + /// matter if it been accessed or not. + explicit SimpleLRUCache(size_t maxSize, size_t expireDurationMs = 0); + + /// Frees all owned data. Check-fails if any element remains pinned. + ~SimpleLRUCache(); + + /// Adds a key-value pair that will occupy the provided size, evicting + /// older elements repeatedly until enough room is avialable in the cache. + /// Returns whether insertion succeeded. If it did, the cache takes + /// ownership of |value|. Insertion will fail in two cases: + /// 1) There isn't enough room in the cache even after all unpinned + /// elements are freed. + /// 2) The key you are adding is already present in the cache. In + /// this case the element currently existing in the cache remains + /// totally unchanged. + /// + /// If you use size to represent in-memory size, keep in mind that the + /// total space used per entry is roughly 2 * key_size + value_size + 30 bytes + /// (nonexact because we use a hash map internally, so the ratio of reserved + /// slot to used slots will vary). + bool add(Key key, Value* value, size_t size); - /// Add an item to the cache. Returns true if the item is successfully - /// added, false otherwise. - bool add(const Key& key, const Value& value); + /// Same as add(), but the value starts pinned. Saves a map lookup if you + /// would otherwise do add() then get(). Keep in mind that if insertion + /// fails the key's pin count has NOT been incremented. + bool addPinned(Key key, Value* value, size_t size); - /// Gets value associated with key. - /// returns std::nullopt when the key is missing - /// returns the cached value, when the key is present. - std::optional get(const Key& key); + /// Gets an unowned pointer to the value associated with key. + /// Returns nullptr if the key is not present in the cache. + /// Once you are done using the returned non-null *value, you must call + /// release with the same key you passed to get. + /// + /// The returned pointer is guaranteed to remain valid until release + /// is called. + /// + /// Note that we return a non-const pointer, and multiple callers + /// can lease the same object, so if you're mutating it you need + /// to manage your own locking. + Value* get(const Key& key); - void clear(); + /// Unpins a key. You MUST call release on every key you have + /// get'd once are you done using the value or bad things will + /// happen (namely, memory leaks). + void release(const Key& key); /// Total size of elements in the cache (NOT the maximum size/limit). size_t currentSize() const { - return lru_.size(); + return curSize_; } /// The maximum size of the cache. size_t maxSize() const { - return lru_.getMaxSize(); + return maxSize_; } - SimpleLRUCacheStats getStats() const { + SimpleLRUCacheStats stats() const { return { - lru_.getMaxSize(), - lru_.size(), + maxSize_, + expireDurationMs_, + curSize_, + pinnedSize_, + lruList_.size(), numHits_, numLookups_, }; } + /// Removes unpinned elements until at least size space is freed. Returns + /// the size actually freed, which may be less than requested if the + /// remaining are all pinned. + size_t free(size_t size); + private: + struct Element { + Key key; + Value* value; + size_t size; + uint32_t numPins; + size_t expireTimeMs; + folly::IntrusiveListHook lruEntry; + folly::IntrusiveListHook expireEntry; + }; + using LruList = folly::IntrusiveList; + using ExpireList = folly::IntrusiveList; + + bool addInternal(Key key, Value* value, size_t size, bool pinned); + + // Removes the expired and unpinned cache entries from the cache. The function + // is invoked upon cache lookup, cache insertion and cache entry release. + void removeExpiredEntries(); + + // Removes entry 'e' from cache by unlinking it from 'lruList_' and + // 'expireList_', and destroy the object at the end. + size_t freeEntry(Element* e); + + const size_t maxSize_; + const size_t expireDurationMs_; + size_t curSize_{0}; + size_t pinnedSize_{0}; size_t numHits_{0}; size_t numLookups_{0}; - folly::EvictingCacheMap lru_; + // Elements get newer as we evict from lruList_.begin() to lruList_.end(). + LruList lruList_; + ExpireList expireList_; + folly::F14FastMap keys_; }; -// -// End of public API. Imlementation follows. -// - -template -inline SimpleLRUCache::SimpleLRUCache( +/// +/// End of public API. Implementation follows. +/// +template +inline SimpleLRUCache::SimpleLRUCache( size_t maxSize, - size_t clearSize) - : lru_(maxSize, clearSize) {} - -template -inline bool SimpleLRUCache::add( - const Key& key, - const Value& value) { - return lru_.insert(key, value).second; + size_t expireDurationMs) + : maxSize_(maxSize), expireDurationMs_(expireDurationMs) {} + +template +inline SimpleLRUCache::~SimpleLRUCache() { + VELOX_CHECK_EQ(pinnedSize_, 0); + // We could be more optimal than calling free here, but in + // general this destructor will never get called during normal + // usage so we don't bother. + free(maxSize_); + VELOX_CHECK(lruList_.empty()); + VELOX_CHECK(expireList_.empty()); + VELOX_CHECK(keys_.empty()); + VELOX_CHECK_EQ(curSize_, 0); } -template -inline std::optional SimpleLRUCache::get(const Key& key) { - ++numLookups_; - auto it = lru_.find(key); - if (it == lru_.end()) { - return std::nullopt; +template +inline bool SimpleLRUCache::add( + Key key, + Value* value, + size_t size) { + return addInternal(key, value, size, /*pinned=*/false); +} + +template +inline bool SimpleLRUCache::addPinned( + Key key, + Value* value, + size_t size) { + return addInternal(key, value, size, /*pinned=*/true); +} + +template +inline void +SimpleLRUCache::removeExpiredEntries() { + if (expireDurationMs_ == 0) { + return; + } + const auto currentTimeMs = getCurrentTimeMs(); + auto it = expireList_.begin(); + while (it != expireList_.end()) { + if (it->expireTimeMs > currentTimeMs) { + return; + } + if (it->numPins > 0) { + ++it; + continue; + } + Element* expiredEntry = &*it; + it = expireList_.erase(it); + freeEntry(expiredEntry); } +} + +template +inline bool SimpleLRUCache::addInternal( + Key key, + Value* value, + size_t size, + bool pinned) { + removeExpiredEntries(); + + if (keys_.find(key) != keys_.end()) { + return false; + } + if (pinnedSize_ + size > maxSize_) { + return false; + } + const int64_t spaceNeeded = curSize_ + size - maxSize_; + if (spaceNeeded > 0) { + free(spaceNeeded); + } + + Element* e = new Element; + e->key = std::move(key); + e->value = value; + e->size = size; + e->numPins = !!pinned; + if (pinned) { + pinnedSize_ += size; + } + keys_.emplace(e->key, e); + lruList_.push_back(*e); + if (expireDurationMs_ != 0) { + e->expireTimeMs = getCurrentTimeMs() + expireDurationMs_; + expireList_.push_back(*e); + } + curSize_ += size; + return true; +} +template +inline Value* SimpleLRUCache::get( + const Key& key) { + removeExpiredEntries(); + + ++numLookups_; + auto it = keys_.find(key); + if (it == keys_.end()) { + return nullptr; + } + Element* entry = it->second; + if (entry->numPins++ == 0) { + pinnedSize_ += entry->size; + } + VELOX_DCHECK(entry->lruEntry.is_linked()); + entry->lruEntry.unlink(); + lruList_.push_back(*entry); ++numHits_; - return it->second; + return it->second->value; +} + +template +inline void SimpleLRUCache::release( + const Key& key) { + Element* e = keys_[key]; + if (--e->numPins == 0) { + pinnedSize_ -= e->size; + } + removeExpiredEntries(); +} + +template +inline size_t SimpleLRUCache::free(size_t size) { + auto it = lruList_.begin(); + size_t freed = 0; + while (it != lruList_.end() && freed < size) { + if (it->numPins == 0) { + Element* evictedEntry = &*it; + it = lruList_.erase(it); + freed += freeEntry(evictedEntry); + } else { + ++it; + } + } + return freed; } -template -inline void SimpleLRUCache::clear() { - lru_.clear(); +template +inline size_t SimpleLRUCache::freeEntry( + Element* e) { + VELOX_CHECK_EQ(e->numPins, 0); + // NOTE: the list hook dtor will unlink the entry from list so we don't need + // to explicitly unlink here. + const auto freedSize = e->size; + curSize_ -= freedSize; + keys_.erase(e->key); + delete e->value; + delete e; + return freedSize; } } // namespace facebook::velox diff --git a/velox/common/caching/tests/CMakeLists.txt b/velox/common/caching/tests/CMakeLists.txt index f02c59b5f573..dac47af8efe8 100644 --- a/velox/common/caching/tests/CMakeLists.txt +++ b/velox/common/caching/tests/CMakeLists.txt @@ -14,8 +14,8 @@ add_executable(simple_lru_cache_test SimpleLRUCacheTest.cpp) add_test(simple_lru_cache_test simple_lru_cache_test) -target_link_libraries(simple_lru_cache_test PRIVATE Folly::folly glog::glog - gtest gtest_main) +target_link_libraries(simple_lru_cache_test PRIVATE Folly::folly velox_time + glog::glog gtest gtest_main) add_executable( velox_cache_test AsyncDataCacheTest.cpp CacheTTLControllerTest.cpp @@ -34,5 +34,6 @@ target_link_libraries( add_executable(cached_factory_test CachedFactoryTest.cpp) add_test(cached_factory_test cached_factory_test) -target_link_libraries(cached_factory_test PRIVATE velox_process Folly::folly - glog::glog gtest gtest_main) +target_link_libraries( + cached_factory_test PRIVATE velox_process Folly::folly velox_time glog::glog + gtest gtest_main) diff --git a/velox/common/caching/tests/CachedFactoryTest.cpp b/velox/common/caching/tests/CachedFactoryTest.cpp index 6ae2d62e7de8..1a9f924c6235 100644 --- a/velox/common/caching/tests/CachedFactoryTest.cpp +++ b/velox/common/caching/tests/CachedFactoryTest.cpp @@ -16,140 +16,160 @@ #include "velox/common/caching/CachedFactory.h" +#include "folly/Random.h" #include "folly/executors/EDFThreadPoolExecutor.h" #include "folly/executors/thread_factory/NamedThreadFactory.h" #include "folly/synchronization/Latch.h" #include "gtest/gtest.h" +#include "velox/common/base/tests/GTestUtils.h" using namespace facebook::velox; -namespace { +namespace { struct DoublerGenerator { - int operator()(const int& value) { - ++generated_; - return value * 2; + std::unique_ptr operator()(const int& value) { + ++generated; + return std::make_unique(value * 2); } - std::atomic generated_ = 0; + std::atomic generated = 0; }; -template -T getCachedValue(std::pair& value) { - return value.second; -} - -template -bool isCached(std::pair& value) { - return value.first; -} - -template -std::pair cacheHit(const T& value) { - return std::make_pair(true, value); -} - -template -std::pair cacheMiss(const T& value) { - return std::make_pair(false, value); -} - +struct IdentityGenerator { + std::unique_ptr operator()(const int& value) { + return std::make_unique(value); + } +}; } // namespace TEST(CachedFactoryTest, basicGeneration) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); - EXPECT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.currentSize(), 0); + { auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); - EXPECT_EQ(*generated, 1); - + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 1); + ASSERT_FALSE(val1.fromCache()); auto val2 = factory.generate(1); - EXPECT_EQ(val2, cacheHit(2)); - EXPECT_EQ(*generated, 1); - EXPECT_EQ(factory.currentSize(), 1); + ASSERT_EQ(*val2, 2); + ASSERT_EQ(*generated, 1); + ASSERT_TRUE(val2.fromCache()); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + { auto val3 = factory.generate(1); - EXPECT_EQ(val3, cacheHit(2)); - EXPECT_EQ(*generated, 1); - + ASSERT_EQ(*val3, 2); + ASSERT_EQ(*generated, 1); + ASSERT_TRUE(val3.fromCache()); auto val4 = factory.generate(2); - EXPECT_EQ(val4, cacheMiss(4)); - EXPECT_EQ(*generated, 2); - + ASSERT_EQ(*val4, 4); + ASSERT_EQ(*generated, 2); + ASSERT_FALSE(val4.fromCache()); auto val5 = factory.generate(3); - EXPECT_EQ(val5, cacheMiss(6)); - EXPECT_EQ(*generated, 3); - EXPECT_EQ(factory.currentSize(), 3); + ASSERT_EQ(*val5, 6); + ASSERT_EQ(*generated, 3); + ASSERT_FALSE(val5.fromCache()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); - auto val6 = factory.generate(1); - EXPECT_EQ(val6, cacheHit(2)); - EXPECT_EQ(*generated, 3); - - auto val7 = factory.generate(4); - EXPECT_EQ(val7, cacheMiss(8)); - EXPECT_EQ(*generated, 4); + { + auto val6 = factory.generate(1); + ASSERT_EQ(*val6, 2); + ASSERT_EQ(*generated, 3); + ASSERT_TRUE(val6.fromCache()); + auto val7 = factory.generate(4); + ASSERT_EQ(*val7, 8); + ASSERT_EQ(*generated, 4); + ASSERT_FALSE(val7.fromCache()); + auto val8 = factory.generate(3); + ASSERT_EQ(*val8, 6); + ASSERT_EQ(*generated, 4); + ASSERT_TRUE(val8.fromCache()); + ASSERT_EQ(factory.currentSize(), 4); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); - auto val8 = factory.generate(3); - EXPECT_EQ(val8, cacheHit(6)); - EXPECT_EQ(*generated, 4); - EXPECT_EQ(factory.currentSize(), 4); + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); } struct DoublerWithExceptionsGenerator { - int operator()(const int& value) { + std::unique_ptr operator()(const int& value) { if (value == 3) { - throw std::invalid_argument("3 is bad"); + VELOX_FAIL("3 is bad"); } - ++generated_; - return value * 2; + ++generated; + return std::make_unique(value * 2); } - int generated_ = 0; + int generated = 0; }; TEST(CachedFactoryTest, clearCache) { auto generator = std::make_unique(); CachedFactory factory( std::make_unique>(1000), std::move(generator)); - EXPECT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.maxSize(), 1000); { auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); + ASSERT_FALSE(val1.fromCache()); } factory.clearCache(); - EXPECT_EQ(factory.currentSize(), 0); - EXPECT_EQ(factory.generate(1), cacheMiss(2)); + ASSERT_EQ(factory.currentSize(), 0); + + ASSERT_FALSE(factory.generate(1).fromCache()); + auto cachedValue = factory.generate(1); + ASSERT_TRUE(cachedValue.fromCache()); + ASSERT_FALSE(factory.generate(2).fromCache()); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + ASSERT_EQ(factory.cacheStats().curSize, 2); + + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + + cachedValue.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); } TEST(CachedFactoryTest, basicExceptionHandling) { auto generator = std::make_unique(); - int* generated = &generator->generated_; + int* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); auto val1 = factory.generate(1); - EXPECT_EQ(getCachedValue(val1), 2); - EXPECT_EQ(*generated, 1); - try { - auto val2 = factory.generate(3); - FAIL() << "Factory generation should have failed"; - } catch (const std::invalid_argument&) { - // Expected. - } + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 1); + VELOX_ASSERT_THROW(factory.generate(3), "3 is bad"); + val1 = factory.generate(4); - EXPECT_EQ(getCachedValue(val1), 8); - EXPECT_EQ(*generated, 2); + ASSERT_EQ(*val1, 8); + ASSERT_EQ(*generated, 2); val1 = factory.generate(1); - EXPECT_EQ(getCachedValue(val1), 2); - EXPECT_EQ(*generated, 2); + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 2); } TEST(CachedFactoryTest, multiThreadedGeneration) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( @@ -157,31 +177,31 @@ TEST(CachedFactoryTest, multiThreadedGeneration) { const int numValues = 5; const int requestsPerValue = 10; folly::Latch latch(numValues * requestsPerValue); - for (int i = 0; i < requestsPerValue; i++) { - for (int j = 0; j < numValues; j++) { + for (int i = 0; i < requestsPerValue; ++i) { + for (int j = 0; j < numValues; ++j) { pool.add([&, j]() { auto value = factory.generate(j); - EXPECT_EQ(getCachedValue(value), 2 * j); + CHECK_EQ(*value, 2 * j); latch.count_down(); }); } } latch.wait(); - EXPECT_EQ(*generated, numValues); + ASSERT_EQ(*generated, numValues); } -// Same as above, but we keep the returned CachedPtrs till the end -// of the function. +// Same as above, but we keep the returned CachedPtrs till the end of the +// function. TEST(CachedFactoryTest, multiThreadedGenerationAgain) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( 100, std::make_shared("test_pool")); const int numValues = 5; const int requestsPerValue = 10; - std::vector> cachedValues(numValues * requestsPerValue); + std::vector> cachedValues(numValues * requestsPerValue); folly::Latch latch(numValues * requestsPerValue); for (int i = 0; i < requestsPerValue; i++) { for (int j = 0; j < numValues; j++) { @@ -195,69 +215,284 @@ TEST(CachedFactoryTest, multiThreadedGenerationAgain) { ASSERT_EQ(*generated, numValues); for (int i = 0; i < requestsPerValue; i++) { for (int j = 0; j < numValues; j++) { - EXPECT_EQ(getCachedValue(cachedValues[i * numValues + j]), 2 * j); + ASSERT_EQ(*cachedValues[i * numValues + j], 2 * j); } } } +TEST(CachedFactoryTest, lruCacheEviction) { + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>(3), std::move(generator)); + ASSERT_EQ(factory.maxSize(), 3); + ASSERT_EQ(factory.currentSize(), 0); + + auto val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + auto val2 = factory.generate(2); + ASSERT_FALSE(val2.fromCache()); + ASSERT_TRUE(val2.cached()); + auto val3 = factory.generate(3); + ASSERT_FALSE(val3.fromCache()); + ASSERT_TRUE(val3.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + auto val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + + { + auto val = factory.generate(4); + ASSERT_FALSE(val.fromCache()); + ASSERT_FALSE(val.cached()); + val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(2); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(3); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + { + auto val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + val1.testingClear(); + val2.testingClear(); + val3.testingClear(); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_TRUE(val4.cached()); + ASSERT_EQ(factory.cacheStats().curSize, 3); + { + auto val = factory.generate(4); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + // Cache entry 2 should be selected for eviction. + val = factory.generate(2); + ASSERT_FALSE(val.fromCache()); + ASSERT_TRUE(val.cached()); + // Cache entry 2 insertion caused cache entry 3 eviction. + val = factory.generate(3); + ASSERT_FALSE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); +} + +TEST(CachedFactoryTest, cacheExpiration) { + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>(3, 1'000), + std::move(generator)); + ASSERT_EQ(factory.maxSize(), 3); + ASSERT_EQ(factory.currentSize(), 0); + + auto val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + auto val2 = factory.generate(2); + ASSERT_FALSE(val2.fromCache()); + ASSERT_TRUE(val2.cached()); + auto val3 = factory.generate(3); + ASSERT_FALSE(val3.fromCache()); + ASSERT_TRUE(val3.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + auto val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + + std::this_thread::sleep_for(std::chrono::milliseconds{1'500}); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val1.testingClear(); + ASSERT_EQ(factory.currentSize(), 2); + ASSERT_EQ(factory.cacheStats().pinnedSize, 2); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_TRUE(val4.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val2.testingClear(); + val3.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + val4.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + + std::this_thread::sleep_for(std::chrono::milliseconds{1'500}); + + val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); +} + TEST(CachedFactoryTest, retrievedCached) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); - for (int i = 0; i < 10; i += 2) + for (int i = 0; i < 10; i += 2) { factory.generate(i); - EXPECT_EQ(*generated, 5); + } + ASSERT_EQ(*generated, 5); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_EQ(factory.cacheStats().curSize, 5); + std::vector keys(10); - for (int i = 0; i < 10; i += 1) + for (int i = 0; i < 10; ++i) { keys[i] = i; - std::vector> cached; + } + std::vector>> cached; std::vector missing; - factory.retrieveCached(keys, &cached, &missing); - ASSERT_EQ(5, cached.size()); + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 5); + ASSERT_EQ(factory.cacheStats().pinnedSize, 5); + ASSERT_EQ(factory.cacheStats().curSize, 5); + for (int i = 0; i < 5; ++i) { - EXPECT_EQ(cached[i].first, 2 * i); - EXPECT_EQ(cached[i].second, 4 * i); + ASSERT_EQ(cached[i].first, 2 * i); + ASSERT_EQ(*cached[i].second, 4 * i); + ASSERT_TRUE(cached[i].second.fromCache()); } - ASSERT_EQ(5, missing.size()); + ASSERT_EQ(missing.size(), 5); + for (int i = 0; i < 5; ++i) { - EXPECT_EQ(missing[i], 2 * i + 1); + ASSERT_EQ(missing[i], 2 * i + 1); } - EXPECT_EQ(*generated, 5); + ASSERT_EQ(*generated, 5); } -TEST(CachedFactoryTest, disableCache) { +TEST(CachedFactoryTest, clearCacheWithManyEntries) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; CachedFactory factory( - nullptr, std::move(generator)); + std::make_unique>(1000), std::move(generator)); + for (auto i = 0; i < 1000; ++i) { + factory.generate(i); + } + std::vector keys(500); + for (int i = 0; i < 500; ++i) { + keys[i] = i; + } + { + std::vector>> cached; + std::vector missing; + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 500); + auto cacheStats = factory.clearCache(); + ASSERT_EQ(cacheStats.numElements, 500); + ASSERT_EQ(cacheStats.pinnedSize, 500); + } + auto cacheStats = factory.cacheStats(); + ASSERT_EQ(cacheStats.numElements, 500); + ASSERT_EQ(cacheStats.pinnedSize, 0); + + cacheStats = factory.clearCache(); + ASSERT_EQ(cacheStats.numElements, 0); + ASSERT_EQ(cacheStats.pinnedSize, 0); +} + +TEST(CachedFactoryTest, disableCache) { + auto generator = std::make_unique(); + auto* generated = &generator->generated; + CachedFactory factory(std::move(generator)); auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); - EXPECT_EQ(*generated, 1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_EQ(*generated, 1); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); auto val2 = factory.generate(1); - EXPECT_EQ(val2, cacheMiss(2)); + ASSERT_FALSE(val2.fromCache()); EXPECT_EQ(*generated, 2); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_EQ(factory.cacheStats().expireDurationMs, 0); - EXPECT_EQ(factory.currentSize(), 0); - - EXPECT_EQ(factory.maxSize(), 0); + ASSERT_EQ(factory.maxSize(), 0); - EXPECT_EQ(factory.cacheStats(), SimpleLRUCacheStats(0, 0, 0, 0)); + EXPECT_EQ(factory.cacheStats(), SimpleLRUCacheStats{}); - EXPECT_EQ(factory.clearCache(), SimpleLRUCacheStats(0, 0, 0, 0)); + EXPECT_EQ(factory.clearCache(), SimpleLRUCacheStats{}); std::vector keys(10); - for (int i = 0; i < 10; i += 1) { + for (int i = 0; i < 10; ++i) { keys[i] = i; } - std::vector> cached; + + std::vector>> cached; std::vector missing; - factory.retrieveCached(keys, &cached, &missing); - ASSERT_EQ(0, cached.size()); - ASSERT_EQ(10, missing.size()); + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 0); + ASSERT_EQ(missing.size(), 10); for (int i = 0; i < 10; ++i) { - EXPECT_EQ(missing[i], i); + ASSERT_EQ(missing[i], i); + } +} + +TEST(CachedFactoryTest, fuzzer) { + const int numThreads = 32; + const int testDurationMs = 5'000; + const size_t expirationDurationMs = 1; + folly::Random::DefaultGenerator rng(23); + for (const bool expireCache : {false, true}) { + SCOPED_TRACE(fmt::format("expireCache: {}", expireCache)); + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>( + 128, expireCache ? expirationDurationMs : 0), + std::move(generator)); + + std::vector threads; + threads.reserve(numThreads); + for (int i = 0; i < numThreads; ++i) { + threads.emplace_back([&]() { + const auto startTimeMs = getCurrentTimeMs(); + while (startTimeMs + testDurationMs > getCurrentTimeMs()) { + const auto key = folly::Random::rand32(rng) % 256; + const auto val = factory.generate(key); + if (val.fromCache()) { + ASSERT_TRUE(val.cached()); + ASSERT_EQ(*val, key); + } + if (folly::Random::oneIn(4)) { + std::this_thread::sleep_for(std::chrono::microseconds{100}); + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_LE(factory.cacheStats().curSize, 128); + ASSERT_LE(factory.cacheStats().numElements, 128); + ASSERT_GT(factory.cacheStats().numHits, 0); + ASSERT_GT(factory.cacheStats().numLookups, 0); } } diff --git a/velox/common/caching/tests/SimpleLRUCacheTest.cpp b/velox/common/caching/tests/SimpleLRUCacheTest.cpp index c7e5b4a9a100..9193d34d0fc0 100644 --- a/velox/common/caching/tests/SimpleLRUCacheTest.cpp +++ b/velox/common/caching/tests/SimpleLRUCacheTest.cpp @@ -15,75 +15,238 @@ */ #include "velox/common/caching/SimpleLRUCache.h" -#include #include "gtest/gtest.h" using namespace facebook::velox; -namespace { -void verifyCacheStats( - const SimpleLRUCacheStats& actual, - size_t maxSize, - size_t curSize, - size_t numHits, - size_t numLookups) { - SimpleLRUCacheStats expectedStats{maxSize, curSize, numHits, numLookups}; - EXPECT_EQ(actual, expectedStats) << " Actual " << actual.toString() - << " Expected " << expectedStats.toString(); -} -} // namespace - TEST(SimpleLRUCache, basicCaching) { SimpleLRUCache cache(1000); - EXPECT_FALSE(cache.get(1).has_value()); - EXPECT_FALSE(cache.get(2).has_value()); - - verifyCacheStats(cache.getStats(), 1000, 0, 0, 2); + ASSERT_TRUE(cache.add(1, new int(11), 1)); + int* value = cache.get(1); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + cache.release(1); - int firstValue = 11; - ASSERT_TRUE(cache.add(1, firstValue)); - auto value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); - - int secondValue = 22; - ASSERT_TRUE(cache.add(2, secondValue)); - - verifyCacheStats(cache.getStats(), 1000, 2, 1, 3); + int* secondValue = new int(22); + ASSERT_TRUE(cache.addPinned(2, secondValue, 1)); + *secondValue += 5; + cache.release(2); value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + cache.release(1); value = cache.get(2); - ASSERT_EQ(value, std::make_optional(22)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 27); + cache.release(2); value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + secondValue = cache.get(1); + ASSERT_EQ(value, secondValue); + cache.release(1); + cache.release(1); + + ASSERT_EQ( + cache.stats().toString(), + "{\n maxSize: 1000\n expireDurationMs: 0\n curSize: 2\n pinnedSize: 0\n numElements: 2\n numHits: 5\n numLookups: 5\n}\n"); +} - value = cache.get(2); - ASSERT_EQ(value, std::make_optional(22)); - verifyCacheStats(cache.getStats(), 1000, 2, 5, 7); +TEST(SimpleLRUCache, lruEviction) { + SimpleLRUCache cache(3); - cache.clear(); - verifyCacheStats(cache.getStats(), 1000, 0, 5, 7); - EXPECT_FALSE(cache.get(1).has_value()); - EXPECT_FALSE(cache.get(2).has_value()); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(cache.add(i, new int(i), 1)); + } + ASSERT_EQ(cache.stats().numElements, 3); + ASSERT_EQ(*cache.get(0), 0); + cache.release(0); + + ASSERT_TRUE(cache.add(3, new int(3), 1)); + ASSERT_EQ(*cache.get(0), 0); + cache.release(0); + ASSERT_EQ(cache.get(1), nullptr); + ASSERT_EQ(*cache.get(3), 3); + cache.release(3); + ASSERT_EQ(cache.stats().numElements, 3); } TEST(SimpleLRUCache, eviction) { SimpleLRUCache cache(1000); for (int i = 0; i < 1010; ++i) { - ASSERT_TRUE(cache.add(i, i)); + ASSERT_TRUE(cache.add(i, new int(i), 1)); } for (int i = 0; i < 10; ++i) { - ASSERT_FALSE(cache.get(i).has_value()); + ASSERT_EQ(cache.get(i), nullptr); } - for (int i = 10; i < 1010; ++i) { - auto value = cache.get(i); - ASSERT_EQ(value, std::make_optional(i)); + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + } +} + +TEST(SimpleLRUCache, pinnedEviction) { + SimpleLRUCache cache(100); + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 1)); + } + for (int i = 10; i < 110; ++i) { + ASSERT_TRUE(cache.add(i, new int(i), 1)); + } + + for (int i = 0; i < 10; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + cache.release(i); // Release the original pin too. + } + for (int i = 10; i < 20; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } + for (int i = 20; i < 110; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + } +} + +TEST(SimpleLRUCache, fullyPinned) { + SimpleLRUCache cache(10); + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 1)); + } + for (int i = 10; i < 20; ++i) { + int* value = new int(i); + ASSERT_FALSE(cache.add(i, value, 1)); + delete value; + } + for (int i = 20; i < 30; ++i) { + int* value = new int(i); + ASSERT_FALSE(cache.addPinned(i, value, 1)); + delete value; + } + + for (int i = 0; i < 10; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + cache.release(i); // Release the original pin too. + } + for (int i = 10; i < 30; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } +} + +TEST(SimpleLRUCache, size) { + SimpleLRUCache cache(10); + ASSERT_EQ(cache.maxSize(), 10); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 2)); + ASSERT_EQ(cache.currentSize(), 2 * (i + 1)); + } + int* value = new int(5); + ASSERT_FALSE(cache.addPinned(5, value, 1)); + + for (int i = 0; i < 5; ++i) { + cache.release(i); + } + ASSERT_TRUE(cache.addPinned(5, value, 10)); + ASSERT_EQ(cache.currentSize(), 10); + + for (int i = 0; i < 5; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } + cache.release(5); +} + +TEST(SimpleLRUCache, insertLargerThanCacheFails) { + SimpleLRUCache cache(10); + + int* value = new int(42); + ASSERT_FALSE(cache.add(123, value, 11)); + delete value; +} + +TEST(SimpleLRUCache, expiredCacheEntries) { + SimpleLRUCache cache(100, 1'000); + + // Expires on insert new entry. + int* value1 = new int(42); + ASSERT_TRUE(cache.add(123, value1, 11)); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(123), value1); + cache.release(123); + + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + + int* value2 = new int(32); + ASSERT_TRUE(cache.add(122, value2, 22)); + ASSERT_EQ(cache.currentSize(), 22); + ASSERT_EQ(cache.get(123), nullptr); + ASSERT_EQ(cache.get(122), value2); + cache.release(122); + + // Expires when get cache entry. + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 22); + ASSERT_EQ(cache.get(123), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + ASSERT_EQ(cache.get(122), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Expires when get the same cache entry. + value2 = new int(33); + ASSERT_TRUE(cache.add(124, value2, 11)); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(124), value2); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 11); + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(124), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Adds multiple entries. + int expectedCacheSize{0}; + for (int i = 0; i < 10; ++i) { + int* value = new int(i); + ASSERT_TRUE(cache.add(i, value, i)); + ASSERT_EQ(cache.get(i), value); + cache.release(i); + expectedCacheSize += i; + ASSERT_EQ(cache.currentSize(), expectedCacheSize); } + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), expectedCacheSize); + ASSERT_EQ(cache.get(0), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Expire on release. + value2 = new int(64); + ASSERT_TRUE(cache.addPinned(124, value2, 11)); + ASSERT_EQ(cache.currentSize(), 11); + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(*cache.get(124), 64); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 11); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 0); + ASSERT_EQ(cache.get(124), nullptr); } diff --git a/velox/connectors/hive/FileHandle.cpp b/velox/connectors/hive/FileHandle.cpp index 8deabd1f2d72..40ac778adc4a 100644 --- a/velox/connectors/hive/FileHandle.cpp +++ b/velox/connectors/hive/FileHandle.cpp @@ -24,6 +24,12 @@ namespace facebook::velox { +uint64_t FileHandleSizer::operator()(const FileHandle& fileHandle) { + // TODO: add to support variable file cache size support when the file system + // underneath supports. + return 1; +} + namespace { // The group tracking is at the level of the directory, i.e. Hive partition. std::string groupName(const std::string& filename) { @@ -33,16 +39,16 @@ std::string groupName(const std::string& filename) { } } // namespace -std::shared_ptr FileHandleGenerator::operator()( +std::unique_ptr FileHandleGenerator::operator()( const std::string& filename) { // We have seen cases where drivers are stuck when creating file handles. // Adding a trace here to spot this more easily in future. process::TraceContext trace("FileHandleGenerator::operator()"); uint64_t elapsedTimeUs{0}; - std::shared_ptr fileHandle; + std::unique_ptr fileHandle; { MicrosecondTimer timer(&elapsedTimeUs); - fileHandle = std::make_shared(); + fileHandle = std::make_unique(); fileHandle->file = filesystems::getFileSystem(filename, properties_) ->openFileForRead(filename); fileHandle->uuid = StringIdLease(fileIds(), filename); diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 6fb6853d7544..e8a9a954094e 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -54,6 +54,11 @@ struct FileHandle { // first diff we'll not include the map. }; +/// Estimates the memory usage of a FileHandle object. +struct FileHandleSizer { + uint64_t operator()(const FileHandle& a); +}; + using FileHandleCache = SimpleLRUCache; // Creates FileHandles via the Generator interface the CachedFactory requires. @@ -62,7 +67,7 @@ class FileHandleGenerator { FileHandleGenerator() {} FileHandleGenerator(std::shared_ptr properties) : properties_(std::move(properties)) {} - std::shared_ptr operator()(const std::string& filename); + std::unique_ptr operator()(const std::string& filename); private: const std::shared_ptr properties_; @@ -70,8 +75,11 @@ class FileHandleGenerator { using FileHandleFactory = CachedFactory< std::string, - std::shared_ptr, - FileHandleGenerator>; + FileHandle, + FileHandleGenerator, + FileHandleSizer>; + +using FileHandleCachedPtr = CachedPtr; using FileHandleCacheStats = SimpleLRUCacheStats; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index c693e8ee0922..00d463b25b91 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -59,8 +59,7 @@ HiveConnector::HiveConnector( hiveConfig_(std::make_shared(config)), fileHandleFactory_( hiveConfig_->isFileHandleCacheEnabled() - ? std::make_unique< - SimpleLRUCache>>( + ? std::make_unique>( hiveConfig_->numCacheFileHandles()) : nullptr, std::make_unique(config)), diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 9785c7a3b894..9e10a1bc397d 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -227,18 +227,18 @@ void SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN); - std::shared_ptr fileHandle; + FileHandleCachedPtr fileHandleCachePtr; try { - fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second; + fileHandleCachePtr = fileHandleFactory_->generate(hiveSplit_->filePath); + VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get()); } catch (const VeloxRuntimeError& e) { if (e.errorCode() == error_code::kFileNotFound && hiveConfig_->ignoreMissingFiles( connectorQueryCtx_->sessionProperties())) { emptySplit_ = true; return; - } else { - throw; } + throw; } // Here we keep adding new entries to CacheTTLController when new fileHandles @@ -246,10 +246,14 @@ void SplitReader::createReader() { // CacheTTLController needs to make sure a size control strategy was available // such as removing aged out entries. if (auto* cacheTTLController = cache::CacheTTLController::getInstance()) { - cacheTTLController->addOpenFileInfo(fileHandle->uuid.id()); + cacheTTLController->addOpenFileInfo(fileHandleCachePtr->uuid.id()); } auto baseFileInput = createBufferedInput( - *fileHandle, baseReaderOpts_, connectorQueryCtx_, ioStats_, executor_); + *fileHandleCachePtr, + baseReaderOpts_, + connectorQueryCtx_, + ioStats_, + executor_); baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 842c350a9239..9eb56c8e8016 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -92,10 +92,10 @@ PositionalDeleteFileReader::PositionalDeleteFileReader( deleteFileSchema, deleteSplit_); - auto deleteFileHandle = - fileHandleFactory_->generate(deleteFile_.filePath).second; + auto deleteFileHandleCachePtr = + fileHandleFactory_->generate(deleteFile_.filePath); auto deleteFileInput = createBufferedInput( - *deleteFileHandle, + *deleteFileHandleCachePtr, deleteReaderOpts, connectorQueryCtx, ioStats_, diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp index f2558cbc1d37..36edb9d631e9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp @@ -67,8 +67,8 @@ TEST_F(S3FileSystemRegistrationTest, fileHandle) { std::make_unique< SimpleLRUCache>>(1000), std::make_unique(hiveConfig)); - auto fileHandle = factory.generate(s3File).second; - readData(fileHandle->file.get()); + auto fileHandleCachePtr = factory.generate(s3File); + readData(fileHandleCachePtr.get()); } TEST_F(S3FileSystemRegistrationTest, finalize) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 93de7e79e515..f306c36a380e 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -60,6 +60,40 @@ TEST_F(S3FileSystemTest, writeAndRead) { readData(readFile.get()); } +TEST_F(S3FileSystemTest, viaRegistry) { + const char* bucketName = "data2"; + const char* file = "test.txt"; + const std::string filename = localPath(bucketName) + "/" + file; + const std::string s3File = s3URI(bucketName, file); + addBucket(bucketName); + { + LocalWriteFile writeFile(filename); + writeData(&writeFile); + } + auto hiveConfig = minioServer_->hiveConfig(); + auto s3fs = filesystems::getFileSystem(s3File, hiveConfig); + auto readFile = s3fs->openFileForRead(s3File); + readData(readFile.get()); +} + +TEST_F(S3FileSystemTest, fileHandle) { + const char* bucketName = "data3"; + const char* file = "test.txt"; + const std::string filename = localPath(bucketName) + "/" + file; + const std::string s3File = s3URI(bucketName, file); + addBucket(bucketName); + { + LocalWriteFile writeFile(filename); + writeData(&writeFile); + } + auto hiveConfig = minioServer_->hiveConfig(); + FileHandleFactory factory( + std::make_unique>(1000), + std::make_unique(hiveConfig)); + auto fileHandle = factory.generate(s3File); + readData(fileHandle->file.get()); +} + TEST_F(S3FileSystemTest, invalidCredentialsConfig) { { const std::unordered_map config( diff --git a/velox/connectors/hive/tests/FileHandleTest.cpp b/velox/connectors/hive/tests/FileHandleTest.cpp index d00c01f8b5de..df045e4fd439 100644 --- a/velox/connectors/hive/tests/FileHandleTest.cpp +++ b/velox/connectors/hive/tests/FileHandleTest.cpp @@ -37,10 +37,9 @@ TEST(FileHandleTest, localFile) { } FileHandleFactory factory( - std::make_unique< - SimpleLRUCache>>(1000), + std::make_unique>(1000), std::make_unique()); - auto fileHandle = factory.generate(filename).second; + auto fileHandle = factory.generate(filename); ASSERT_EQ(fileHandle->file->size(), 3); char buffer[3]; ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo");