diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index fcf8c5c9a9d7..62d2a6a5d991 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -404,14 +404,14 @@ class TpchBenchmark { #endif if (cache_) { - cache_->testingClear(); + cache_->clear(); } } if (FLAGS_clear_ssd_cache) { if (cache_) { auto ssdCache = cache_->ssdCache(); if (ssdCache) { - ssdCache->testingClear(); + ssdCache->clear(); } } } diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index 6909958b84fa..67421ced6be9 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -149,6 +149,11 @@ void registerVeloxMetrics() { // last counter retrieval. DEFINE_METRIC(kMetricMemoryCacheNumEvicts, facebook::velox::StatType::SUM); + // Number of times a valid entry was removed in order to make space but has + // not been saved to SSD yet, since last counter retrieval. + DEFINE_METRIC( + kMetricMemoryCacheNumSavableEvicts, facebook::velox::StatType::SUM); + // Number of entries considered for evicting, since last counter retrieval. DEFINE_METRIC( kMetricMemoryCacheNumEvictChecks, facebook::velox::StatType::SUM); @@ -256,6 +261,10 @@ void registerVeloxMetrics() { // Total number of cache regions evicted. DEFINE_METRIC(kMetricSsdCacheRegionsEvicted, facebook::velox::StatType::SUM); + // Total number of cache entries recovered from checkpoint. + DEFINE_METRIC( + kMetricSsdCacheRecoveredEntries, facebook::velox::StatType::SUM); + /// ================== Memory Arbitration Counters ================= // The number of arbitration requests. diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index 76d8300d96d1..eb1e46d971a5 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -206,6 +206,9 @@ constexpr folly::StringPiece kMetricMemoryCacheNumNew{ constexpr folly::StringPiece kMetricMemoryCacheNumEvicts{ "velox.memory_cache_num_evicts"}; +constexpr folly::StringPiece kMetricMemoryCacheNumSavableEvicts{ + "velox.memory_cache_num_savable_evicts"}; + constexpr folly::StringPiece kMetricMemoryCacheNumEvictChecks{ "velox.memory_cache_num_evict_checks"}; @@ -293,6 +296,9 @@ constexpr folly::StringPiece kMetricSsdCacheCheckpointsWritten{ constexpr folly::StringPiece kMetricSsdCacheRegionsEvicted{ "velox.ssd_cache_regions_evicted"}; +constexpr folly::StringPiece kMetricSsdCacheRecoveredEntries{ + "velox.ssd_cache_recovered_entries"}; + constexpr folly::StringPiece kMetricExchangeDataTimeMs{ "velox.exchange_data_time_ms"}; diff --git a/velox/common/base/PeriodicStatsReporter.cpp b/velox/common/base/PeriodicStatsReporter.cpp index 43f37ef436ad..5bd8781a6b08 100644 --- a/velox/common/base/PeriodicStatsReporter.cpp +++ b/velox/common/base/PeriodicStatsReporter.cpp @@ -159,6 +159,8 @@ void PeriodicStatsReporter::reportCacheStats() { REPORT_IF_NOT_ZERO(kMetricMemoryCacheHitBytes, deltaCacheStats.hitBytes); REPORT_IF_NOT_ZERO(kMetricMemoryCacheNumNew, deltaCacheStats.numNew); REPORT_IF_NOT_ZERO(kMetricMemoryCacheNumEvicts, deltaCacheStats.numEvict); + REPORT_IF_NOT_ZERO( + kMetricMemoryCacheNumSavableEvicts, deltaCacheStats.numSavableEvict); REPORT_IF_NOT_ZERO( kMetricMemoryCacheNumEvictChecks, deltaCacheStats.numEvictChecks); REPORT_IF_NOT_ZERO( @@ -227,6 +229,8 @@ void PeriodicStatsReporter::reportCacheStats() { REPORT_IF_NOT_ZERO( kMetricSsdCacheReadWithoutChecksum, deltaSsdStats.readWithoutChecksumChecks); + REPORT_IF_NOT_ZERO( + kMetricSsdCacheRecoveredEntries, deltaSsdStats.entriesRecovered); } // TTL controler snapshot stats. diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index 8ed3857d3124..3b199e64cb3e 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -477,6 +477,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricMemoryCacheHitBytes.str()), 0); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumNew.str()), 0); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvicts.str()), 0); + ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumSavableEvicts.str()), 0); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvictChecks.str()), 0); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumWaitExclusive.str()), 0); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumAllocClocks.str()), 0); @@ -502,6 +503,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricSsdCacheRegionsEvicted.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutEntries.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutRegions.str()), 0); + ASSERT_EQ(counterMap.count(kMetricSsdCacheRecoveredEntries.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheReadWithoutChecksum.str()), 0); ASSERT_EQ(counterMap.size(), 22); } @@ -530,11 +532,13 @@ TEST_F(PeriodicStatsReporterTest, basic) { newSsdStats->readSsdCorruptions = 10; newSsdStats->readCheckpointErrors = 10; newSsdStats->readWithoutChecksumChecks = 10; + newSsdStats->entriesRecovered = 10; cache.updateStats( {.numHit = 10, .hitBytes = 10, .numNew = 10, .numEvict = 10, + .numSavableEvict = 10, .numEvictChecks = 10, .numWaitExclusive = 10, .numAgedOut = 10, @@ -556,6 +560,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricMemoryCacheHitBytes.str()), 1); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumNew.str()), 1); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvicts.str()), 1); + ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumSavableEvicts.str()), 1); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumEvictChecks.str()), 1); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumWaitExclusive.str()), 1); ASSERT_EQ(counterMap.count(kMetricMemoryCacheNumAllocClocks.str()), 1); @@ -581,8 +586,9 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricSsdCacheRegionsEvicted.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutEntries.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutRegions.str()), 1); + ASSERT_EQ(counterMap.count(kMetricSsdCacheRecoveredEntries.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheReadWithoutChecksum.str()), 1); - ASSERT_EQ(counterMap.size(), 52); + ASSERT_EQ(counterMap.size(), 54); } } diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index ce6e97b0aa27..e4de373d04e9 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -371,7 +371,8 @@ uint64_t CacheShard::evict( MachinePageCount pagesToAcquire, memory::Allocation& acquired) { auto* ssdCache = cache_->ssdCache(); - const bool skipSsdSaveable = ssdCache && ssdCache->writeInProgress(); + const bool skipSsdSaveable = + (ssdCache != nullptr) && ssdCache->writeInProgress(); auto now = accessTime(); std::vector toFree; int64_t tinyEvicted = 0; @@ -420,6 +421,9 @@ uint64_t CacheShard::evict( ++evictSaveableSkipped; continue; } + if (candidate->ssdSaveable()) { + ++numSavableEvict_; + } largeEvicted += candidate->data_.byteSize(); if (pagesToAcquire > 0) { const auto candidatePages = candidate->data().numPages(); @@ -539,6 +543,7 @@ void CacheShard::updateStats(CacheStats& stats) { stats.hitBytes += hitBytes_; stats.numNew += numNew_; stats.numEvict += numEvict_; + stats.numSavableEvict += numSavableEvict_; stats.numEvictChecks += numEvictChecks_; stats.numWaitExclusive += numWaitExclusive_; stats.numAgedOut += numAgedOut_; @@ -547,13 +552,15 @@ void CacheShard::updateStats(CacheStats& stats) { stats.allocClocks += allocClocks_; } -void CacheShard::appendSsdSaveable(std::vector& pins) { +void CacheShard::appendSsdSaveable(bool saveAll, std::vector& pins) { std::lock_guard l(mutex_); // Do not add entries to a write batch more than maxWriteRatio_. If SSD save // is slower than storage read, we must not have a situation where SSD save // pins everything and stops reading. - const auto limit = static_cast( - static_cast(entries_.size()) * maxWriteRatio_); + const int32_t limit = saveAll + ? std::numeric_limits::max() + : static_cast( + static_cast(entries_.size()) * maxWriteRatio_); VELOX_CHECK(cache_->ssdCache()->writeInProgress()); for (auto& entry : entries_) { if (entry && (entry->ssdFile_ == nullptr) && !entry->isExclusive() && @@ -625,6 +632,7 @@ CacheStats CacheStats::operator-(CacheStats& other) const { result.hitBytes = hitBytes - other.hitBytes; result.numNew = numNew - other.numNew; result.numEvict = numEvict - other.numEvict; + result.numSavableEvict = numSavableEvict - other.numSavableEvict; result.numEvictChecks = numEvictChecks - other.numEvictChecks; result.numWaitExclusive = numWaitExclusive - other.numWaitExclusive; result.numAgedOut = numAgedOut - other.numAgedOut; @@ -905,12 +913,12 @@ void AsyncDataCache::possibleSsdSave(uint64_t bytes) { } } -void AsyncDataCache::saveToSsd() { +void AsyncDataCache::saveToSsd(bool saveAll) { std::vector pins; VELOX_CHECK(ssdCache_->writeInProgress()); ssdSaveable_ = 0; for (auto& shard : shards_) { - shard->appendSsdSaveable(pins); + shard->appendSsdSaveable(saveAll, pins); } ssdCache_->write(std::move(pins)); } @@ -947,7 +955,7 @@ CacheStats AsyncDataCache::refreshStats() const { return stats; } -void AsyncDataCache::testingClear() { +void AsyncDataCache::clear() { for (auto& shard : shards_) { memory::Allocation unused; shard->evict(std::numeric_limits::max(), true, 0, unused); @@ -1002,6 +1010,7 @@ std::string CacheStats::toString() const { // Cache access stats. << "Cache access miss: " << numNew << " hit: " << numHit << " hit bytes: " << succinctBytes(hitBytes) << " eviction: " << numEvict + << " savable eviction: " << numSavableEvict << " eviction checks: " << numEvictChecks << " aged out: " << numAgedOut << " stales: " << numStales << "\n" diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 0d672a94ba5f..120b11095448 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -522,6 +522,9 @@ struct CacheStats { int64_t numNew{0}; /// Number of times a valid entry was removed in order to make space. int64_t numEvict{0}; + /// Number of times a valid entry was removed in order to make space but has + /// not been saved to SSD yet. + int64_t numSavableEvict{0}; /// Number of entries considered for evicting. int64_t numEvictChecks{0}; /// Number of times a user waited for an entry to transit from exclusive to @@ -602,12 +605,14 @@ class CacheShard { /// Adds the stats of 'this' to 'stats'. void updateStats(CacheStats& stats); - /// Appends a batch of non-saved SSD savable entries in 'this' to - /// 'pins'. This may have to be called several times since this keeps - /// limits on the batch to write at one time. The savable entries - /// are pinned for read. 'pins' should be written or dropped before - /// calling this a second time. - void appendSsdSaveable(std::vector& pins); + /// Appends a batch of non-saved SSD savable entries in 'this' to 'pins'. This + /// may have to be called several times since this keeps limits on the batch + /// to write at one time. The savable entries are pinned for read. 'pins' + /// should be written or dropped before calling this a second time. If 'all' + /// is true, then appends all the non-savable SSD savable entries without + /// limitation check. 'saveAll' is set to true for Prestissimo worker + /// operation use case. + void appendSsdSaveable(bool saveAll, std::vector& pins); /// Remove cache entries from this shard for files in the fileNum set /// 'filesToRemove'. If successful, return true, and 'filesRetained' contains @@ -672,6 +677,8 @@ class CacheShard { uint64_t numNew_{0}; // Cumulative count of entries evicted. uint64_t numEvict_{0}; + // Cumulative count of evicted entries which has not been saved to SSD yet. + uint64_t numSavableEvict_{0}; // Cumulative count of entries considered for eviction. This divided by // 'numEvict_' measured efficiency of eviction. uint64_t numEvictChecks_{0}; @@ -841,8 +848,9 @@ class AsyncDataCache : public memory::Cache { } } - // Saves all entries with 'ssdSaveable_' to 'ssdCache_'. - void saveToSsd(); + /// Saves entries in 'ssdSaveable_' to 'ssdCache_'. If 'saveAll' is true, then + /// write them all in 'ssdSaveable_'. + void saveToSsd(bool saveAll = false); tsan_atomic& numSkippedSaves() { return numSkippedSaves_; @@ -857,7 +865,9 @@ class AsyncDataCache : public memory::Cache { folly::F14FastSet& filesRetained); /// Drops all unpinned entries. Pins stay valid. - void testingClear(); + /// + /// NOTE: it is used by testing and Prestissimo server operation. + void clear(); std::vector testingCacheEntries() const; @@ -865,6 +875,10 @@ class AsyncDataCache : public memory::Cache { return ssdSaveable_; } + int32_t testingNumShards() const { + return shards_.size(); + } + private: static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; diff --git a/velox/common/caching/SsdCache.cpp b/velox/common/caching/SsdCache.cpp index bf95bdfdb551..29b44e8abd4b 100644 --- a/velox/common/caching/SsdCache.cpp +++ b/velox/common/caching/SsdCache.cpp @@ -145,6 +145,16 @@ void SsdCache::write(std::vector pins) { writesInProgress_.fetch_sub(numNoStore); } +void SsdCache::checkpoint() { + VELOX_CHECK_EQ(numShards_, writesInProgress_); + for (auto i = 0; i < numShards_; ++i) { + executor_->add([this, i]() { + files_[i]->checkpoint(/*force=*/true); + --writesInProgress_; + }); + } +} + bool SsdCache::removeFileEntries( const folly::F14FastSet& filesToRemove, folly::F14FastSet& filesRetained) { @@ -206,9 +216,9 @@ void SsdCache::shutdown() { VELOX_SSD_CACHE_LOG(INFO) << "SSD cache has been shutdown"; } -void SsdCache::testingClear() { +void SsdCache::clear() { for (auto& file : files_) { - file->testingClear(); + file->clear(); } } @@ -233,7 +243,7 @@ uint64_t SsdCache::testingTotalLogEvictionFilesSize() { return size; } -void SsdCache::testingWaitForWriteToFinish() { +void SsdCache::waitForWriteToFinish() { while (writesInProgress_ != 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT } diff --git a/velox/common/caching/SsdCache.h b/velox/common/caching/SsdCache.h index 3ede4b4e3b0b..386e79ed8eec 100644 --- a/velox/common/caching/SsdCache.h +++ b/velox/common/caching/SsdCache.h @@ -115,10 +115,19 @@ class SsdCache { /// Stores the entries of 'pins' into the corresponding files. Sets the file /// for the successfully stored entries. May evict existing entries from - /// unpinned regions. startWrite() must have been called first and it must - /// have returned true. + /// unpinned regions. + /// + /// NOTE: startWrite() must have been called first and it must have returned + /// true. void write(std::vector pins); + /// Invoked to write checkpoints to all ssd files. This is used by Prestissimo + /// worker operation. + /// + /// NOTE: startWrite() must have been called first and it must have returned + /// true. + void checkpoint(); + /// Removes cached entries from all SsdFiles for files in the fileNum set /// 'filesToRemove'. If successful, return true, and 'filesRetained' contains /// entries that should not be removed, ex., from pinned regions. Otherwise, @@ -147,7 +156,13 @@ class SsdCache { /// Drops all entries. Outstanding pins become invalid but reading them will /// mostly succeed since the files will not be rewritten until new content is /// stored. - void testingClear(); + /// + /// NOTE: it is used by test and Prestissimo worker operation. + void clear(); + + /// Waits until the pending ssd cache writes or checkpoints to finish. Used by + /// test and Prestissimo worker operation. + void waitForWriteToFinish(); /// Deletes backing files. Used in testing. void testingDeleteFiles(); @@ -158,9 +173,6 @@ class SsdCache { /// Returns the total size of eviction log files. Used by test only. uint64_t testingTotalLogEvictionFilesSize(); - /// Waits until the pending ssd cache writes finish. Used by test only. - void testingWaitForWriteToFinish(); - private: void checkNotShutdownLocked() { VELOX_CHECK( diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 8ba28455c4cf..190f6c4c02d3 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -556,14 +556,14 @@ void SsdFile::updateStats(SsdCacheStats& stats) const { stats.readSsdCorruptions += stats_.readSsdCorruptions; } -void SsdFile::testingClear() { +void SsdFile::clear() { std::lock_guard l(mutex_); entries_.clear(); std::fill(regionSizes_.begin(), regionSizes_.end(), 0); std::fill(erasedRegionSizes_.begin(), erasedRegionSizes_.end(), 0); writableRegions_.resize(numRegions_); std::iota(writableRegions_.begin(), writableRegions_.end(), 0); - tracker_.testingClear(); + tracker_.clear(); } void SsdFile::testingDeleteFile() { @@ -835,6 +835,9 @@ void SsdFile::checkpoint(bool force) { // log evictions. The latter might lead to data consistent issue. checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log"); checkRc(::fsync(evictLogFd_), "Sync of evict log"); + + VELOX_SSD_CACHE_LOG(INFO) + << "Checkpoint persisted with " << entries_.size() << " cache entries"; } catch (const std::exception& e) { try { checkpointError(-1, e.what()); @@ -855,10 +858,11 @@ void SsdFile::initializeCheckpoint() { hasCheckpoint = false; ++stats_.openCheckpointErrors; VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( - "Starting shard {} without checkpoint, with checksum write {}, read verification {}.", + "Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", shardId_, checksumEnabled_ ? "enabled" : "disabled", - checksumReadVerificationEnabled_ ? "enabled" : "disabled"); + checksumReadVerificationEnabled_ ? "enabled" : "disabled", + getCheckpointFilePath()); } const auto logPath = getEvictLogFilePath(); evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); @@ -971,8 +975,9 @@ void SsdFile::readCheckpoint(std::ifstream& state) { isChecksumEnabledOnCheckpointVersion(std::string(versionMagic, 4)); if (checksumEnabled_ && !checkpoinHasChecksum) { VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( - "Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery.", - shardId_); + "Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}", + shardId_, + getCheckpointFilePath()); return; } @@ -1027,6 +1032,8 @@ void SsdFile::readCheckpoint(std::ifstream& state) { } } ++stats_.checkpointsRead; + stats_.entriesRecovered += entries_.size(); + // The state is successfully read. Install the access frequency scores and // evicted regions. VELOX_CHECK_EQ(scores.size(), tracker_.regionScores().size()); @@ -1037,13 +1044,14 @@ void SsdFile::readCheckpoint(std::ifstream& state) { } tracker_.setRegionScores(scores); VELOX_SSD_CACHE_LOG(INFO) << fmt::format( - "Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}.", + "Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}", shardId_, entries_.size(), numRegions_, writableRegions_.size(), checksumEnabled_ ? "enabled" : "disabled", - checksumReadVerificationEnabled_ ? "enabled" : "disabled"); + checksumReadVerificationEnabled_ ? "enabled" : "disabled", + getCheckpointFilePath()); } } // namespace facebook::velox::cache diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index ff31d280b584..05bf673eacb7 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -141,6 +141,7 @@ struct SsdCacheStats { bytesWritten = tsanAtomicValue(other.bytesWritten); checkpointsWritten = tsanAtomicValue(other.checkpointsWritten); entriesRead = tsanAtomicValue(other.entriesRead); + entriesRecovered = tsanAtomicValue(other.entriesRecovered); bytesRead = tsanAtomicValue(other.bytesRead); checkpointsRead = tsanAtomicValue(other.checkpointsRead); entriesCached = tsanAtomicValue(other.entriesCached); @@ -172,6 +173,7 @@ struct SsdCacheStats { result.bytesWritten = bytesWritten - other.bytesWritten; result.checkpointsWritten = checkpointsWritten - other.checkpointsWritten; result.entriesRead = entriesRead - other.entriesRead; + result.entriesRecovered = entriesRecovered - other.entriesRecovered; result.bytesRead = bytesRead - other.bytesRead; result.checkpointsRead = checkpointsRead - other.checkpointsRead; result.entriesAgedOut = entriesAgedOut - other.entriesAgedOut; @@ -208,6 +210,7 @@ struct SsdCacheStats { tsan_atomic bytesWritten{0}; tsan_atomic checkpointsWritten{0}; tsan_atomic entriesRead{0}; + tsan_atomic entriesRecovered{0}; tsan_atomic bytesRead{0}; tsan_atomic checkpointsRead{0}; tsan_atomic entriesAgedOut{0}; @@ -347,10 +350,10 @@ class SsdFile { const folly::F14FastSet& filesToRemove, folly::F14FastSet& filesRetained); - /// Writes a checkpoint state that can be recovered from. The - /// checkpoint is serialized on 'mutex_'. If 'force' is false, - /// rechecks that at least 'checkpointIntervalBytes_' have been - /// written since last checkpoint and silently returns if not. + /// Writes a checkpoint state that can be recovered from. The checkpoint is + /// serialized on 'mutex_'. If 'force' is false, rechecks that at least + /// 'checkpointIntervalBytes_' have been written since last checkpoint and + /// silently returns if not. void checkpoint(bool force = false); /// Deletes checkpoint files. If 'keepLog' is true, truncates and syncs the @@ -376,7 +379,9 @@ class SsdFile { void testingDeleteFile(); /// Resets this' to a post-construction empty state. See SsdCache::clear(). - void testingClear(); + /// + /// NOTE: this is only used by test and Prestissimo worker operation. + void clear(); /// Returns true if copy on write is disabled for this file. Used in testing. bool testingIsCowDisabled() const; diff --git a/velox/common/caching/SsdFileTracker.h b/velox/common/caching/SsdFileTracker.h index b78a189a8f40..5ab825f50b88 100644 --- a/velox/common/caching/SsdFileTracker.h +++ b/velox/common/caching/SsdFileTracker.h @@ -85,7 +85,9 @@ class SsdFileTracker { } /// Resets scores of all regions. - void testingClear() { + /// + /// NOTE: this is only used by test and Prestissimo worker operation. + void clear() { std::fill(regionScores_.begin(), regionScores_.end(), 0); } diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 12c8c5a21340..3f624f9deda0 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -78,6 +78,7 @@ class AsyncDataCacheTest : public ::testing::TestWithParam { protected: static constexpr int32_t kNumFiles = 100; + static constexpr int32_t kNumSsdShards = 4; static void SetUpTestCase() { TestValue::enable(); @@ -113,6 +114,7 @@ class AsyncDataCacheTest : public ::testing::TestWithParam { uint64_t maxBytes, int64_t ssdBytes = 0, uint64_t checkpointIntervalBytes = 0, + bool eraseCheckpoint = false, AsyncDataCache::Options cacheOptions = {}) { if (cache_ != nullptr) { cache_->shutdown(); @@ -126,13 +128,13 @@ class AsyncDataCacheTest : public ::testing::TestWithParam { // Make a new tempDirectory only if one is not already set. The // second creation of cache must find the checkpoint of the // previous one. - if (tempDirectory_ == nullptr) { + if (tempDirectory_ == nullptr || eraseCheckpoint) { tempDirectory_ = exec::test::TempDirectoryPath::create(); } SsdCache::Config config( fmt::format("{}/cache", tempDirectory_->getPath()), ssdBytes, - 4, + kNumSsdShards, ssdExecutor(), checkpointIntervalBytes > 0 ? checkpointIntervalBytes : ssdBytes / 20, false, @@ -687,7 +689,7 @@ TEST_P(AsyncDataCacheTest, pin) { EXPECT_EQ(0, stats.numShared); EXPECT_EQ(0, stats.numExclusive); - cache_->testingClear(); + cache_->clear(); stats = cache_->refreshStats(); EXPECT_EQ(0, stats.largeSize); EXPECT_EQ(0, stats.numEntries); @@ -867,7 +869,7 @@ TEST_P(AsyncDataCacheTest, DISABLED_ssd) { ASSERT_EQ(ramStats.numShared, 0); ASSERT_EQ(ramStats.numExclusive, 0); - cache_->ssdCache()->testingClear(); + cache_->ssdCache()->clear(); // We cut the tail off one of the cache shards. corruptFile(fmt::format("{}/cache0.cpt", tempDirectory_->getPath())); // We open the cache from checkpoint. Reading checks the data integrity, here @@ -922,7 +924,7 @@ TEST_P(AsyncDataCacheTest, cacheStats) { "Cache size: 2.56KB tinySize: 257B large size: 2.31KB\n" "Cache entries: 100 read pins: 30 write pins: 20 pinned shared: 10.00MB pinned exclusive: 10.00MB\n" " num write wait: 244 empty entries: 20\n" - "Cache access miss: 2041 hit: 46 hit bytes: 1.34KB eviction: 463 eviction checks: 348 aged out: 10 stales: 100\n" + "Cache access miss: 2041 hit: 46 hit bytes: 1.34KB eviction: 463 savable eviction: 0 eviction checks: 348 aged out: 10 stales: 100\n" "Prefetch entries: 30 bytes: 100B\n" "Alloc Megaclocks 0"); @@ -958,7 +960,7 @@ TEST_P(AsyncDataCacheTest, cacheStats) { "Cache size: 0B tinySize: 0B large size: 0B\n" "Cache entries: 0 read pins: 0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n" " num write wait: 0 empty entries: 0\n" - "Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n" + "Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 savable eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n" "Prefetch entries: 0 bytes: 0B\n" "Alloc Megaclocks 0\n" "Allocated pages: 0 cached pages: 0\n" @@ -982,7 +984,7 @@ TEST_P(AsyncDataCacheTest, cacheStats) { "Cache size: 0B tinySize: 0B large size: 0B\n" "Cache entries: 0 read pins: 0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n" " num write wait: 0 empty entries: 0\n" - "Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n" + "Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 savable eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n" "Prefetch entries: 0 bytes: 0B\n" "Alloc Megaclocks 0\n" "Allocated pages: 0 cached pages: 0\n"; @@ -1177,12 +1179,16 @@ TEST_P(AsyncDataCacheTest, shutdown) { SCOPED_TRACE(fmt::format("asyncShutdown {}", asyncShutdown)); // Initialize cache with a big checkpointIntervalBytes, giving eviction log // chance to survive. - initializeCache(kRamBytes, kSsdBytes, kSsdBytes * 10); + initializeCache( + kRamBytes, + kSsdBytes, + /*checkpointIntervalBytes=*/(1ULL << 30) * kNumSsdShards); ASSERT_EQ(cache_->ssdCache()->stats().openCheckpointErrors, 4); // Write large amount of data, making sure eviction is triggered and the log // file is not zero. loadLoop(0, 16 * kSsdBytes); + ASSERT_EQ(cache_->ssdCache()->stats().checkpointsWritten, 0); ASSERT_GT(cache_->ssdCache()->stats().regionsEvicted, 0); ASSERT_GT(cache_->ssdCache()->testingTotalLogEvictionFilesSize(), 0); @@ -1221,7 +1227,7 @@ TEST_P(AsyncDataCacheTest, shutdown) { "Unexpected write after SSD cache has been shutdown"); // Re-initialize cache. - cache_->ssdCache()->testingClear(); + cache_->ssdCache()->clear(); initializeCache(kRamBytes, kSsdBytes, kSsdBytes * 10); // Checkpoint files are intact and readable. ASSERT_EQ(cache_->ssdCache()->stats().openCheckpointErrors, 0); @@ -1260,7 +1266,7 @@ DEBUG_ONLY_TEST_P(AsyncDataCacheTest, shrinkWithSsdWrite) { // Starts a write thread running at background. std::thread ssdWriteThread([&]() { - cache_->ssdCache()->startWrite(); + ASSERT_TRUE(cache_->ssdCache()->startWrite()); cache_->saveToSsd(); }); @@ -1373,7 +1379,7 @@ TEST_P(AsyncDataCacheTest, makeEvictable) { if (ssdCache == nullptr) { continue; } - ssdCache->testingWaitForWriteToFinish(); + ssdCache->waitForWriteToFinish(); if (evictable) { ASSERT_EQ(ssdCache->stats().entriesCached, 0); } else { @@ -1420,6 +1426,7 @@ TEST_P(AsyncDataCacheTest, ssdWriteOptions) { kRamBytes, kSsdBytes, 0, + true, {testData.maxWriteRatio, testData.ssdSavableRatio, testData.minSsdSavableBytes}); @@ -1440,6 +1447,79 @@ TEST_P(AsyncDataCacheTest, ssdWriteOptions) { } } +TEST_P(AsyncDataCacheTest, appendSsdSaveable) { + constexpr uint64_t kRamBytes = 64UL << 20; // 64 MB + constexpr uint64_t kSsdBytes = 128UL << 20; // 128 MB + + // Test if ssd write behavior with different settings. + struct { + double maxWriteRatio; + double ssdSavableRatio; + int32_t minSsdSavableBytes; + bool appendAll; + + std::string debugString() const { + return fmt::format( + "maxWriteRatio {}, ssdSavableRatio {}, minSsdSavableBytes {}, appendAll {}", + maxWriteRatio, + ssdSavableRatio, + minSsdSavableBytes, + appendAll); + } + } testSettings[] = { + {0.0, 10000.0, 1ULL << 30, true}, {0.0, 10000.0, 1UL << 30, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + initializeCache( + kRamBytes, + kSsdBytes, + /*checkpointIntervalBytes=*/1UL << 30, + /*eraseCheckpoint=*/true, + {testData.maxWriteRatio, + testData.ssdSavableRatio, + testData.minSsdSavableBytes}); + // Load data half of the in-memory capacity. + loadLoop(0, kRamBytes / 2); + waitForPendingLoads(); + auto stats = cache_->refreshStats(); + + ASSERT_TRUE(cache_->ssdCache()->startWrite()); + cache_->saveToSsd(testData.appendAll); + + cache_->ssdCache()->waitForWriteToFinish(); + stats = cache_->refreshStats(); + if (testData.appendAll) { + // There might be some cache evictions. + ASSERT_GE(stats.ssdStats->entriesWritten, stats.numEntries); + } else { + ASSERT_EQ(stats.ssdStats->entriesWritten, cache_->testingNumShards()); + } + } +} + +TEST_P(AsyncDataCacheTest, checkpoint) { + constexpr uint64_t kRamBytes = 16UL << 20; // 16 MB + constexpr uint64_t kSsdBytes = 64UL << 20; // 64 MB + + initializeCache( + kRamBytes, + kSsdBytes, + /*checkpointIntervalBytes=*/1ULL << 30, + /*eraseCheckpoint=*/true); + // Load data half of the in-memory capacity. + loadLoop(0, kRamBytes / 2); + waitForPendingLoads(); + auto stats = cache_->refreshStats(); + ASSERT_EQ(stats.ssdStats->checkpointsWritten, 0); + ASSERT_TRUE(cache_->ssdCache()->startWrite()); + cache_->ssdCache()->checkpoint(); + cache_->ssdCache()->waitForWriteToFinish(); + stats = cache_->refreshStats(); + ASSERT_EQ(stats.ssdStats->checkpointsWritten, kNumSsdShards); +} + // TODO: add concurrent fuzzer test. INSTANTIATE_TEST_SUITE_P( diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index c6e328d7f32f..017de42d932c 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -654,13 +654,13 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) { }; pins.clear(); - cache_->testingClear(); + cache_->clear(); ASSERT_EQ(cache_->refreshStats().numEntries, 0); ASSERT_EQ(checkEntries(entries), entries.size()); ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0); - cache_->testingClear(); + cache_->clear(); ASSERT_EQ(cache_->refreshStats().numEntries, 0); #ifndef NDEBUG diff --git a/velox/common/caching/tests/SsdFileTrackerTest.cpp b/velox/common/caching/tests/SsdFileTrackerTest.cpp index fe0edf02643e..851c72773747 100644 --- a/velox/common/caching/tests/SsdFileTrackerTest.cpp +++ b/velox/common/caching/tests/SsdFileTrackerTest.cpp @@ -50,7 +50,7 @@ TEST(SsdFileTrackerTest, tracker) { EXPECT_EQ(candidates, expected); // Test large region scores. - tracker.testingClear(); + tracker.clear(); for (auto region = 0; region < kNumRegions; ++region) { tracker.regionRead(region, INT32_MAX); tracker.regionRead(region, region * 100'000'000); diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 725c05b57070..36c2446bb423 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -743,7 +743,7 @@ TEST_P(MemoryPoolTest, memoryCapExceptions) { "tinySize: 0B large size: 0B\nCache entries: 0 read pins: " "0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n " "num write wait: 0 empty entries: 0\nCache access miss: 0 " - "hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 " + "hit: 0 hit bytes: 0B eviction: 0 savable eviction: 0 eviction checks: 0 " "aged out: 0 stales: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks 0\n" "Allocated pages: 0 cached pages: 0\n", isLeafThreadSafe_ ? "thread-safe" : "non-thread-safe"), @@ -777,7 +777,7 @@ TEST_P(MemoryPoolTest, memoryCapExceptions) { "size: 0B tinySize: 0B large size: 0B\nCache entries: 0 " "read pins: 0 write pins: 0 pinned shared: 0B pinned " "exclusive: 0B\n num write wait: 0 empty entries: 0\nCache " - "access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction " + "access miss: 0 hit: 0 hit bytes: 0B eviction: 0 savable eviction: 0 eviction " "checks: 0 aged out: 0 stales: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks" " 0\nAllocated pages: 0 cached pages: 0\n", isLeafThreadSafe_ ? "thread-safe" : "non-thread-safe"), diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index df72e0368da5..a67d516fb25d 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -290,6 +290,10 @@ Cache - Sum - Number of times a valid entry was removed in order to make space, since last counter retrieval. + * - memory_cache_num_savable_evicts + - Sum + - Number of times a valid entry was removed in order to make space but has not + been saved to SSD yet, since last counter retrieval. * - memory_cache_num_evict_checks - Sum - Number of entries considered for evicting, since last counter retrieval. @@ -384,6 +388,9 @@ Cache * - ssd_cache_regions_evicted - Sum - Total number of cache regions evicted. + * - ssd_cache_recovered_entries + - Sum + - Total number of cache entries recovered from checkpoint. Storage ------- diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index 88cb7800b94f..e3a9dff0f7cc 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -533,7 +533,7 @@ TEST_F(CacheTest, ssd) { EXPECT_LT(0, ioStats_->rawOverreadBytes()); auto fullStripeBytes = ioStats_->rawBytesRead(); auto bytes = ioStats_->rawBytesRead(); - cache_->testingClear(); + cache_->clear(); // We read 10 stripes with some columns sparsely accessed. readLoop("testfile", 30, 70, 10, 10, 1, /*noCacheRetention=*/false, ioStats_); auto sparseStripeBytes = (ioStats_->rawBytesRead() - bytes) / 10; @@ -548,7 +548,7 @@ TEST_F(CacheTest, ssd) { "prefix1_", 0, kSsdBytes / bytesPerFile, 30, 100, 1, kStripesPerFile, 4); waitForWrite(); - cache_->testingClear(); + cache_->clear(); // Read double this to get some eviction from SSD. readFiles( "prefix1_", @@ -824,7 +824,7 @@ TEST_F(CacheTest, noCacheRetention) { ASSERT_LT(0, ioStats_->rawBytesRead()); auto* ssdCache = cache_->ssdCache(); if (ssdCache != nullptr) { - ssdCache->testingWaitForWriteToFinish(); + ssdCache->waitForWriteToFinish(); if (testData.noCacheRetention) { ASSERT_EQ(ssdCache->stats().entriesCached, 0); } else { @@ -945,7 +945,7 @@ TEST_F(CacheTest, ssdReadVerification) { // Corrupt SSD cache file. corruptSsdFile(fmt::format("{}/cache0", tempDirectory_->getPath())); // Clear memory cache to force ssd read. - cache_->testingClear(); + cache_->clear(); // Record the baseline stats. const auto prevStats = cache_->refreshStats(); diff --git a/velox/exec/fuzzer/CacheFuzzer.cpp b/velox/exec/fuzzer/CacheFuzzer.cpp index 5ad590a521f1..d472bf0f85c7 100644 --- a/velox/exec/fuzzer/CacheFuzzer.cpp +++ b/velox/exec/fuzzer/CacheFuzzer.cpp @@ -275,7 +275,7 @@ void CacheFuzzer::readCache() { void CacheFuzzer::reset() { cache_->shutdown(); - cache_->ssdCache()->testingWaitForWriteToFinish(); + cache_->ssdCache()->waitForWriteToFinish(); executor_->join(); executor_.reset(); fileNames_.clear(); diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index d986aeb743b4..f426f66dc85c 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -234,7 +234,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { const std::vector numPrefetchSplits = {0, 2}; for (const auto& numPrefetchSplit : numPrefetchSplits) { SCOPED_TRACE(fmt::format("numPrefetchSplit {}", numPrefetchSplit)); - asyncDataCache_->testingClear(); + asyncDataCache_->clear(); auto filePath = TempFilePath::create(); writeToFile(filePath->getPath(), vectors); diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 6eebf2ba43de..762f94e07e3a 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -84,7 +84,7 @@ void OperatorTestBase::setupMemory( int64_t memoryPoolInitCapacity, int64_t memoryPoolReservedCapacity) { if (asyncDataCache_ != nullptr) { - asyncDataCache_->testingClear(); + asyncDataCache_->clear(); asyncDataCache_.reset(); } MemoryManagerOptions options;