Skip to content

Commit

Permalink
Cache operation support for Prestissimo (#10401)
Browse files Browse the repository at this point in the history
Summary:
Add checkpoint and ssd write functions support for Prestissimo worker to operate the
cache for performance tuning and online debugging in a test environment
Add metrics about how many ssd cache entries are recovered and how many in-memory
cache entries are evicted without saving to SSD.
Other logs improvements.

Pull Request resolved: #10401

Reviewed By: Yuhta, zacw7, oerling

Differential Revision: D59401173

Pulled By: xiaoxmeng

fbshipit-source-id: 2cbd4b7603065454e1cbbcccf42441309f772bb7
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jul 6, 2024
1 parent b9268a6 commit 07684ea
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 65 deletions.
4 changes: 2 additions & 2 deletions velox/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,14 @@ class TpchBenchmark {
#endif

if (cache_) {
cache_->testingClear();
cache_->clear();
}
}
if (FLAGS_clear_ssd_cache) {
if (cache_) {
auto ssdCache = cache_->ssdCache();
if (ssdCache) {
ssdCache->testingClear();
ssdCache->clear();
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ void registerVeloxMetrics() {
// last counter retrieval.
DEFINE_METRIC(kMetricMemoryCacheNumEvicts, facebook::velox::StatType::SUM);

// Number of times a valid entry was removed in order to make space but has
// not been saved to SSD yet, since last counter retrieval.
DEFINE_METRIC(
kMetricMemoryCacheNumSavableEvicts, facebook::velox::StatType::SUM);

// Number of entries considered for evicting, since last counter retrieval.
DEFINE_METRIC(
kMetricMemoryCacheNumEvictChecks, facebook::velox::StatType::SUM);
Expand Down Expand Up @@ -256,6 +261,10 @@ void registerVeloxMetrics() {
// Total number of cache regions evicted.
DEFINE_METRIC(kMetricSsdCacheRegionsEvicted, facebook::velox::StatType::SUM);

// Total number of cache entries recovered from checkpoint.
DEFINE_METRIC(
kMetricSsdCacheRecoveredEntries, facebook::velox::StatType::SUM);

/// ================== Memory Arbitration Counters =================

// The number of arbitration requests.
Expand Down
6 changes: 6 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ constexpr folly::StringPiece kMetricMemoryCacheNumNew{
constexpr folly::StringPiece kMetricMemoryCacheNumEvicts{
"velox.memory_cache_num_evicts"};

constexpr folly::StringPiece kMetricMemoryCacheNumSavableEvicts{
"velox.memory_cache_num_savable_evicts"};

constexpr folly::StringPiece kMetricMemoryCacheNumEvictChecks{
"velox.memory_cache_num_evict_checks"};

Expand Down Expand Up @@ -293,6 +296,9 @@ constexpr folly::StringPiece kMetricSsdCacheCheckpointsWritten{
constexpr folly::StringPiece kMetricSsdCacheRegionsEvicted{
"velox.ssd_cache_regions_evicted"};

constexpr folly::StringPiece kMetricSsdCacheRecoveredEntries{
"velox.ssd_cache_recovered_entries"};

constexpr folly::StringPiece kMetricExchangeDataTimeMs{
"velox.exchange_data_time_ms"};

Expand Down
4 changes: 4 additions & 0 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ void PeriodicStatsReporter::reportCacheStats() {
REPORT_IF_NOT_ZERO(kMetricMemoryCacheHitBytes, deltaCacheStats.hitBytes);
REPORT_IF_NOT_ZERO(kMetricMemoryCacheNumNew, deltaCacheStats.numNew);
REPORT_IF_NOT_ZERO(kMetricMemoryCacheNumEvicts, deltaCacheStats.numEvict);
REPORT_IF_NOT_ZERO(
kMetricMemoryCacheNumSavableEvicts, deltaCacheStats.numSavableEvict);
REPORT_IF_NOT_ZERO(
kMetricMemoryCacheNumEvictChecks, deltaCacheStats.numEvictChecks);
REPORT_IF_NOT_ZERO(
Expand Down Expand Up @@ -227,6 +229,8 @@ void PeriodicStatsReporter::reportCacheStats() {
REPORT_IF_NOT_ZERO(
kMetricSsdCacheReadWithoutChecksum,
deltaSsdStats.readWithoutChecksumChecks);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheRecoveredEntries, deltaSsdStats.entriesRecovered);
}

// TTL controler snapshot stats.
Expand Down
8 changes: 7 additions & 1 deletion velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricMemoryCacheHitBytes.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumNew.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvicts.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumSavableEvicts.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvictChecks.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumWaitExclusive.str()), 0);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumAllocClocks.str()), 0);
Expand All @@ -502,6 +503,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheRegionsEvicted.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutEntries.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutRegions.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheRecoveredEntries.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadWithoutChecksum.str()), 0);
ASSERT_EQ(counterMap.size(), 22);
}
Expand Down Expand Up @@ -530,11 +532,13 @@ TEST_F(PeriodicStatsReporterTest, basic) {
newSsdStats->readSsdCorruptions = 10;
newSsdStats->readCheckpointErrors = 10;
newSsdStats->readWithoutChecksumChecks = 10;
newSsdStats->entriesRecovered = 10;
cache.updateStats(
{.numHit = 10,
.hitBytes = 10,
.numNew = 10,
.numEvict = 10,
.numSavableEvict = 10,
.numEvictChecks = 10,
.numWaitExclusive = 10,
.numAgedOut = 10,
Expand All @@ -556,6 +560,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricMemoryCacheHitBytes.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumNew.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvicts.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumSavableEvicts.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvictChecks.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumWaitExclusive.str()), 1);
ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumAllocClocks.str()), 1);
Expand All @@ -581,8 +586,9 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheRegionsEvicted.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutEntries.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutRegions.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheRecoveredEntries.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadWithoutChecksum.str()), 1);
ASSERT_EQ(counterMap.size(), 52);
ASSERT_EQ(counterMap.size(), 54);
}
}

Expand Down
23 changes: 16 additions & 7 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ uint64_t CacheShard::evict(
MachinePageCount pagesToAcquire,
memory::Allocation& acquired) {
auto* ssdCache = cache_->ssdCache();
const bool skipSsdSaveable = ssdCache && ssdCache->writeInProgress();
const bool skipSsdSaveable =
(ssdCache != nullptr) && ssdCache->writeInProgress();
auto now = accessTime();
std::vector<memory::Allocation> toFree;
int64_t tinyEvicted = 0;
Expand Down Expand Up @@ -420,6 +421,9 @@ uint64_t CacheShard::evict(
++evictSaveableSkipped;
continue;
}
if (candidate->ssdSaveable()) {
++numSavableEvict_;
}
largeEvicted += candidate->data_.byteSize();
if (pagesToAcquire > 0) {
const auto candidatePages = candidate->data().numPages();
Expand Down Expand Up @@ -539,6 +543,7 @@ void CacheShard::updateStats(CacheStats& stats) {
stats.hitBytes += hitBytes_;
stats.numNew += numNew_;
stats.numEvict += numEvict_;
stats.numSavableEvict += numSavableEvict_;
stats.numEvictChecks += numEvictChecks_;
stats.numWaitExclusive += numWaitExclusive_;
stats.numAgedOut += numAgedOut_;
Expand All @@ -547,13 +552,15 @@ void CacheShard::updateStats(CacheStats& stats) {
stats.allocClocks += allocClocks_;
}
void CacheShard::appendSsdSaveable(std::vector<CachePin>& pins) {
void CacheShard::appendSsdSaveable(bool saveAll, std::vector<CachePin>& pins) {
std::lock_guard<std::mutex> l(mutex_);
// Do not add entries to a write batch more than maxWriteRatio_. If SSD save
// is slower than storage read, we must not have a situation where SSD save
// pins everything and stops reading.
const auto limit = static_cast<int32_t>(
static_cast<double>(entries_.size()) * maxWriteRatio_);
const int32_t limit = saveAll
? std::numeric_limits<int32_t>::max()
: static_cast<int32_t>(
static_cast<double>(entries_.size()) * maxWriteRatio_);
VELOX_CHECK(cache_->ssdCache()->writeInProgress());
for (auto& entry : entries_) {
if (entry && (entry->ssdFile_ == nullptr) && !entry->isExclusive() &&
Expand Down Expand Up @@ -625,6 +632,7 @@ CacheStats CacheStats::operator-(CacheStats& other) const {
result.hitBytes = hitBytes - other.hitBytes;
result.numNew = numNew - other.numNew;
result.numEvict = numEvict - other.numEvict;
result.numSavableEvict = numSavableEvict - other.numSavableEvict;
result.numEvictChecks = numEvictChecks - other.numEvictChecks;
result.numWaitExclusive = numWaitExclusive - other.numWaitExclusive;
result.numAgedOut = numAgedOut - other.numAgedOut;
Expand Down Expand Up @@ -905,12 +913,12 @@ void AsyncDataCache::possibleSsdSave(uint64_t bytes) {
}
}
void AsyncDataCache::saveToSsd() {
void AsyncDataCache::saveToSsd(bool saveAll) {
std::vector<CachePin> pins;
VELOX_CHECK(ssdCache_->writeInProgress());
ssdSaveable_ = 0;
for (auto& shard : shards_) {
shard->appendSsdSaveable(pins);
shard->appendSsdSaveable(saveAll, pins);
}
ssdCache_->write(std::move(pins));
}
Expand Down Expand Up @@ -947,7 +955,7 @@ CacheStats AsyncDataCache::refreshStats() const {
return stats;
}
void AsyncDataCache::testingClear() {
void AsyncDataCache::clear() {
for (auto& shard : shards_) {
memory::Allocation unused;
shard->evict(std::numeric_limits<uint64_t>::max(), true, 0, unused);
Expand Down Expand Up @@ -1002,6 +1010,7 @@ std::string CacheStats::toString() const {
// Cache access stats.
<< "Cache access miss: " << numNew << " hit: " << numHit
<< " hit bytes: " << succinctBytes(hitBytes) << " eviction: " << numEvict
<< " savable eviction: " << numSavableEvict
<< " eviction checks: " << numEvictChecks << " aged out: " << numAgedOut
<< " stales: " << numStales
<< "\n"
Expand Down
32 changes: 23 additions & 9 deletions velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ struct CacheStats {
int64_t numNew{0};
/// Number of times a valid entry was removed in order to make space.
int64_t numEvict{0};
/// Number of times a valid entry was removed in order to make space but has
/// not been saved to SSD yet.
int64_t numSavableEvict{0};
/// Number of entries considered for evicting.
int64_t numEvictChecks{0};
/// Number of times a user waited for an entry to transit from exclusive to
Expand Down Expand Up @@ -602,12 +605,14 @@ class CacheShard {
/// Adds the stats of 'this' to 'stats'.
void updateStats(CacheStats& stats);

/// Appends a batch of non-saved SSD savable entries in 'this' to
/// 'pins'. This may have to be called several times since this keeps
/// limits on the batch to write at one time. The savable entries
/// are pinned for read. 'pins' should be written or dropped before
/// calling this a second time.
void appendSsdSaveable(std::vector<CachePin>& pins);
/// Appends a batch of non-saved SSD savable entries in 'this' to 'pins'. This
/// may have to be called several times since this keeps limits on the batch
/// to write at one time. The savable entries are pinned for read. 'pins'
/// should be written or dropped before calling this a second time. If 'all'
/// is true, then appends all the non-savable SSD savable entries without
/// limitation check. 'saveAll' is set to true for Prestissimo worker
/// operation use case.
void appendSsdSaveable(bool saveAll, std::vector<CachePin>& pins);

/// Remove cache entries from this shard for files in the fileNum set
/// 'filesToRemove'. If successful, return true, and 'filesRetained' contains
Expand Down Expand Up @@ -672,6 +677,8 @@ class CacheShard {
uint64_t numNew_{0};
// Cumulative count of entries evicted.
uint64_t numEvict_{0};
// Cumulative count of evicted entries which has not been saved to SSD yet.
uint64_t numSavableEvict_{0};
// Cumulative count of entries considered for eviction. This divided by
// 'numEvict_' measured efficiency of eviction.
uint64_t numEvictChecks_{0};
Expand Down Expand Up @@ -841,8 +848,9 @@ class AsyncDataCache : public memory::Cache {
}
}

// Saves all entries with 'ssdSaveable_' to 'ssdCache_'.
void saveToSsd();
/// Saves entries in 'ssdSaveable_' to 'ssdCache_'. If 'saveAll' is true, then
/// write them all in 'ssdSaveable_'.
void saveToSsd(bool saveAll = false);

tsan_atomic<int32_t>& numSkippedSaves() {
return numSkippedSaves_;
Expand All @@ -857,14 +865,20 @@ class AsyncDataCache : public memory::Cache {
folly::F14FastSet<uint64_t>& filesRetained);

/// Drops all unpinned entries. Pins stay valid.
void testingClear();
///
/// NOTE: it is used by testing and Prestissimo server operation.
void clear();

std::vector<AsyncDataCacheEntry*> testingCacheEntries() const;

uint64_t testingSsdSavable() const {
return ssdSaveable_;
}

int32_t testingNumShards() const {
return shards_.size();
}

private:
static constexpr int32_t kNumShards = 4; // Must be power of 2.
static constexpr int32_t kShardMask = kNumShards - 1;
Expand Down
16 changes: 13 additions & 3 deletions velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ void SsdCache::write(std::vector<CachePin> pins) {
writesInProgress_.fetch_sub(numNoStore);
}

void SsdCache::checkpoint() {
VELOX_CHECK_EQ(numShards_, writesInProgress_);
for (auto i = 0; i < numShards_; ++i) {
executor_->add([this, i]() {
files_[i]->checkpoint(/*force=*/true);
--writesInProgress_;
});
}
}

bool SsdCache::removeFileEntries(
const folly::F14FastSet<uint64_t>& filesToRemove,
folly::F14FastSet<uint64_t>& filesRetained) {
Expand Down Expand Up @@ -206,9 +216,9 @@ void SsdCache::shutdown() {
VELOX_SSD_CACHE_LOG(INFO) << "SSD cache has been shutdown";
}

void SsdCache::testingClear() {
void SsdCache::clear() {
for (auto& file : files_) {
file->testingClear();
file->clear();
}
}

Expand All @@ -233,7 +243,7 @@ uint64_t SsdCache::testingTotalLogEvictionFilesSize() {
return size;
}

void SsdCache::testingWaitForWriteToFinish() {
void SsdCache::waitForWriteToFinish() {
while (writesInProgress_ != 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
}
Expand Down
24 changes: 18 additions & 6 deletions velox/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,19 @@ class SsdCache {

/// Stores the entries of 'pins' into the corresponding files. Sets the file
/// for the successfully stored entries. May evict existing entries from
/// unpinned regions. startWrite() must have been called first and it must
/// have returned true.
/// unpinned regions.
///
/// NOTE: startWrite() must have been called first and it must have returned
/// true.
void write(std::vector<CachePin> pins);

/// Invoked to write checkpoints to all ssd files. This is used by Prestissimo
/// worker operation.
///
/// NOTE: startWrite() must have been called first and it must have returned
/// true.
void checkpoint();

/// Removes cached entries from all SsdFiles for files in the fileNum set
/// 'filesToRemove'. If successful, return true, and 'filesRetained' contains
/// entries that should not be removed, ex., from pinned regions. Otherwise,
Expand Down Expand Up @@ -147,7 +156,13 @@ class SsdCache {
/// Drops all entries. Outstanding pins become invalid but reading them will
/// mostly succeed since the files will not be rewritten until new content is
/// stored.
void testingClear();
///
/// NOTE: it is used by test and Prestissimo worker operation.
void clear();

/// Waits until the pending ssd cache writes or checkpoints to finish. Used by
/// test and Prestissimo worker operation.
void waitForWriteToFinish();

/// Deletes backing files. Used in testing.
void testingDeleteFiles();
Expand All @@ -158,9 +173,6 @@ class SsdCache {
/// Returns the total size of eviction log files. Used by test only.
uint64_t testingTotalLogEvictionFilesSize();

/// Waits until the pending ssd cache writes finish. Used by test only.
void testingWaitForWriteToFinish();

private:
void checkNotShutdownLocked() {
VELOX_CHECK(
Expand Down
Loading

0 comments on commit 07684ea

Please sign in to comment.