Skip to content

Commit

Permalink
Improve stripe metadata io and increase the ssd checksum verification…
Browse files Browse the repository at this point in the history
… coverage (#10220)

Summary:
Remove unnecessary stripe footer loads as for cache input stream those pre-loads are useless if followup
loads do not start from the same offsets, for direct  buffer input stream those preload buffers are simply
dropped as the preload doesn't hold the stream or the buffer and there are no cache involved. The direct
buffer input is also used with batch which doesn't have SSD cache either. This optimizes the file footer
handling when we open a new file in dwrf reader base. It also avoids the cache pollution if we read a small
buffer first from the same offset, or ensure SSD checksum coverage on all SSD reads if we we read a larger
buffer first from the same offset.
This PR adds counters to record the number of times that in-memory cache entries get invalidates because
of short cache entry size as well as the number of times that on-disk SSD cache read checksum verifications
get skipped because of size mismatch. We will enforce check in SSD cache to disallow SSD cache
request size mismatch after rolling out in production for a while.
Some code cleanup in split reader,

For 1hr batch stress shadow test, the average execution time has been reduced by 10% and walltime by ~15%.
The improvement comes from the reduced table scan read bytes which has been reduced from
224TB to 128TB.
Similar improvement on execution/walltime observed on Deltoid shadow which only has 200 queries
which might need more testing.

Pull Request resolved: #10220

Reviewed By: Yuhta, zacw7, oerling

Differential Revision: D58693531

Pulled By: xiaoxmeng

fbshipit-source-id: 4ba47b595e2bd809c25869713dd10871539c1116
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jun 20, 2024
1 parent 1a50a8a commit 97cdc63
Show file tree
Hide file tree
Showing 34 changed files with 366 additions and 187 deletions.
16 changes: 16 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricMemoryCacheNumAgedOutEntries, facebook::velox::StatType::SUM);

// Number of AsyncDataCache entries that are stale because of cache request
// size mismatch.
DEFINE_METRIC(
kMetricMemoryCacheNumStaleEntries, facebook::velox::StatType::COUNT);

/// ================== SsdCache Counters ==================

// Number of regions currently cached by SSD.
Expand Down Expand Up @@ -236,6 +241,11 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricSsdCacheReadCheckpointErrors, facebook::velox::StatType::SUM);

// Total number of SSD cache reads without checksum verification due to
// mismatch in SSD cache request size.
DEFINE_METRIC(
kMetricSsdCacheReadWithoutChecksum, facebook::velox::StatType::COUNT);

// Total number of checkpoints read.
DEFINE_METRIC(kMetricSsdCacheCheckpointsRead, facebook::velox::StatType::SUM);

Expand Down Expand Up @@ -471,5 +481,11 @@ void registerVeloxMetrics() {

// The exchange data size in bytes.
DEFINE_METRIC(kMetricExchangeDataBytes, facebook::velox::StatType::SUM);

// The distribution of exchange data size in range of [0, 128MB] with 128
// buckets. It is configured to report the capacity at P50, P90, P99, and P100
// percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricExchangeDataSize, 1L << 20, 0, 128L << 20, 50, 90, 99, 100);
}
} // namespace facebook::velox
9 changes: 9 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ constexpr folly::StringPiece kMetricMemoryCacheNumAllocClocks{
constexpr folly::StringPiece kMetricMemoryCacheNumAgedOutEntries{
"velox.memory_cache_num_aged_out_entries"};

constexpr folly::StringPiece kMetricMemoryCacheNumStaleEntries{
"velox.memory_cache_num_stale_entries"};

constexpr folly::StringPiece kMetricSsdCacheCachedRegions{
"velox.ssd_cache_cached_regions"};

Expand Down Expand Up @@ -278,6 +281,9 @@ constexpr folly::StringPiece kMetricSsdCacheReadSsdErrors{
constexpr folly::StringPiece kMetricSsdCacheReadCheckpointErrors{
"velox.ssd_cache_read_checkpoint_errors"};

constexpr folly::StringPiece kMetricSsdCacheReadWithoutChecksum{
"velox.ssd_cache_read_without_checksum"};

constexpr folly::StringPiece kMetricSsdCacheCheckpointsRead{
"velox.ssd_cache_checkpoints_read"};

Expand All @@ -295,4 +301,7 @@ constexpr folly::StringPiece kMetricExchangeDataSizeTimeMs{

constexpr folly::StringPiece kMetricExchangeDataBytes{
"velox.exchange_data_bytes"};

constexpr folly::StringPiece kMetricExchangeDataSize{
"velox.exchange_data_size"};
} // namespace facebook::velox
5 changes: 5 additions & 0 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ CachePin CacheShard::findOrCreate(
<< " requested size " << size;
// The old entry is superseded. Possible readers of the old entry still
// retain a valid read pin.
RECORD_METRIC_VALUE(kMetricMemoryCacheNumStaleEntries);
++numStales_;
foundEntry->key_.fileNum.clear();
entryMap_.erase(it);
}
Expand Down Expand Up @@ -534,6 +536,7 @@ void CacheShard::updateStats(CacheStats& stats) {
stats.numEvictChecks += numEvictChecks_;
stats.numWaitExclusive += numWaitExclusive_;
stats.numAgedOut += numAgedOut_;
stats.numStales += numStales_;
stats.sumEvictScore += sumEvictScore_;
stats.allocClocks += allocClocks_;
}
Expand Down Expand Up @@ -619,6 +622,7 @@ CacheStats CacheStats::operator-(CacheStats& other) const {
result.numEvictChecks = numEvictChecks - other.numEvictChecks;
result.numWaitExclusive = numWaitExclusive - other.numWaitExclusive;
result.numAgedOut = numAgedOut - other.numAgedOut;
result.numStales = numStales - other.numStales;
result.allocClocks = allocClocks - other.allocClocks;
result.sumEvictScore = sumEvictScore - other.sumEvictScore;
if (ssdStats != nullptr && other.ssdStats != nullptr) {
Expand Down Expand Up @@ -993,6 +997,7 @@ std::string CacheStats::toString() const {
<< "Cache access miss: " << numNew << " hit: " << numHit
<< " hit bytes: " << succinctBytes(hitBytes) << " eviction: " << numEvict
<< " eviction checks: " << numEvictChecks << " aged out: " << numAgedOut
<< " stales: " << numStales
<< "\n"
// Cache prefetch stats.
<< "Prefetch entries: " << numPrefetch
Expand Down
9 changes: 7 additions & 2 deletions velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,10 @@ struct CacheStats {
/// shared mode.
int64_t numWaitExclusive{0};
/// Total number of entries that are aged out and beyond TTL.
int64_t numAgedOut{};
int64_t numAgedOut{0};
/// Total number of entries that are stale because of cache request size
/// mismatch.
int64_t numStales{0};
/// Cumulative clocks spent in allocating or freeing memory for backing cache
/// entries.
uint64_t allocClocks{0};
Expand Down Expand Up @@ -673,7 +676,9 @@ class CacheShard {
// 'numEvict_' measured efficiency of eviction.
uint64_t numEvictChecks_{0};
// Cumulative count of entries aged out due to TTL.
uint64_t numAgedOut_{};
uint64_t numAgedOut_{0};
// Cumulative count of stale entries because of cache request size mismatch.
uint64_t numStales_{0};
// Cumulative sum of evict scores. This divided by 'numEvict_' correlates to
// time data stays in cache.
uint64_t sumEvictScore_{0};
Expand Down
2 changes: 2 additions & 0 deletions velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ SsdCache::SsdCache(const Config& config)
filePrefix_);
VELOX_CHECK_NOT_NULL(executor_);

VELOX_SSD_CACHE_LOG(INFO) << "SSD cache config: " << config.toString();

auto checksumReadVerificationEnabled = config.checksumReadVerificationEnabled;
if (config.checksumReadVerificationEnabled && !config.checksumEnabled) {
VELOX_SSD_CACHE_LOG(WARNING)
Expand Down
17 changes: 14 additions & 3 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
#include <fstream>
#include <numeric>

#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"

DEFINE_bool(ssd_odirect, true, "Use O_DIRECT for SSD cache IO");
DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD");

Expand Down Expand Up @@ -268,9 +271,7 @@ CoalesceIoStats SsdFile::load(
pins[i].checkedEntry()->setSsdFile(this, ssdPins[i].run().offset());
auto* entry = pins[i].checkedEntry();
auto ssdRun = ssdPins[i].run();
if (ssdRun.size() == entry->size()) {
maybeVerifyChecksum(*entry, ssdRun);
}
maybeVerifyChecksum(*entry, ssdRun);
}
return stats;
}
Expand Down Expand Up @@ -915,6 +916,16 @@ void SsdFile::maybeVerifyChecksum(
if (!checksumReadVerificationEnabled_) {
return;
}
VELOX_DCHECK_EQ(ssdRun.size(), entry.size());
if (ssdRun.size() != entry.size()) {
RECORD_METRIC_VALUE(kMetricSsdCacheReadWithoutChecksum);
++stats_.readWithoutChecksumChecks;
VELOX_CACHE_LOG_EVERY_MS(WARNING, 1'000)
<< "SSD read without checksum due to cache request size mismatch, SSD cache size "
<< ssdRun.size() << " request size " << entry.size()
<< ", cache request: " << entry.toString();
return;
}

// Verifies that the checksum matches after we read from SSD.
const auto checksum = checksumEntry(entry);
Expand Down
5 changes: 5 additions & 0 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ struct SsdCacheStats {
readSsdErrors = tsanAtomicValue(other.readSsdErrors);
readCheckpointErrors = tsanAtomicValue(other.readCheckpointErrors);
readSsdCorruptions = tsanAtomicValue(other.readSsdCorruptions);
readWithoutChecksumChecks =
tsanAtomicValue(other.readWithoutChecksumChecks);
}

SsdCacheStats operator-(const SsdCacheStats& other) const {
Expand Down Expand Up @@ -190,6 +192,8 @@ struct SsdCacheStats {
result.readSsdErrors = readSsdErrors - other.readSsdErrors;
result.readCheckpointErrors =
readCheckpointErrors - other.readCheckpointErrors;
result.readWithoutChecksumChecks =
readWithoutChecksumChecks - other.readWithoutChecksumChecks;
return result;
}

Expand Down Expand Up @@ -220,6 +224,7 @@ struct SsdCacheStats {
tsan_atomic<uint32_t> readSsdErrors{0};
tsan_atomic<uint32_t> readCheckpointErrors{0};
tsan_atomic<uint32_t> readSsdCorruptions{0};
tsan_atomic<uint32_t> readWithoutChecksumChecks{0};
};

/// A shard of SsdCache. Corresponds to one file on SSD. The data backed by each
Expand Down
72 changes: 69 additions & 3 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,15 +904,40 @@ TEST_P(AsyncDataCacheTest, cacheStats) {
stats.numAgedOut = 10;
stats.allocClocks = 1320;
stats.sumEvictScore = 123;
stats.numStales = 100;
ASSERT_EQ(
stats.toString(),
"Cache size: 2.56KB tinySize: 257B large size: 2.31KB\n"
"Cache entries: 100 read pins: 30 write pins: 20 pinned shared: 10.00MB pinned exclusive: 10.00MB\n"
" num write wait: 244 empty entries: 20\n"
"Cache access miss: 2041 hit: 46 hit bytes: 1.34KB eviction: 463 eviction checks: 348 aged out: 10\n"
"Cache access miss: 2041 hit: 46 hit bytes: 1.34KB eviction: 463 eviction checks: 348 aged out: 10 stales: 100\n"
"Prefetch entries: 30 bytes: 100B\n"
"Alloc Megaclocks 0");

CacheStats statsDelta = stats - stats;
ASSERT_EQ(statsDelta.tinySize, 0);
ASSERT_EQ(statsDelta.largeSize, 0);
ASSERT_EQ(statsDelta.tinyPadding, 0);
ASSERT_EQ(statsDelta.largePadding, 0);
ASSERT_EQ(statsDelta.numEntries, 0);
ASSERT_EQ(statsDelta.numExclusive, 0);
ASSERT_EQ(statsDelta.numShared, 0);
ASSERT_EQ(statsDelta.sharedPinnedBytes, 0);
ASSERT_EQ(statsDelta.exclusivePinnedBytes, 0);
ASSERT_EQ(statsDelta.numEmptyEntries, 0);
ASSERT_EQ(statsDelta.numPrefetch, 0);
ASSERT_EQ(statsDelta.prefetchBytes, 0);
ASSERT_EQ(statsDelta.numHit, 0);
ASSERT_EQ(statsDelta.hitBytes, 0);
ASSERT_EQ(statsDelta.numNew, 0);
ASSERT_EQ(statsDelta.numEvict, 0);
ASSERT_EQ(statsDelta.numEvictChecks, 0);
ASSERT_EQ(statsDelta.numWaitExclusive, 0);
ASSERT_EQ(statsDelta.numAgedOut, 0);
ASSERT_EQ(statsDelta.allocClocks, 0);
ASSERT_EQ(statsDelta.sumEvictScore, 0);
ASSERT_EQ(statsDelta.numStales, 0);

constexpr uint64_t kRamBytes = 32 << 20;
constexpr uint64_t kSsdBytes = 512UL << 20;
initializeCache(kRamBytes, kSsdBytes);
Expand All @@ -921,7 +946,7 @@ TEST_P(AsyncDataCacheTest, cacheStats) {
"Cache size: 0B tinySize: 0B large size: 0B\n"
"Cache entries: 0 read pins: 0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n"
" num write wait: 0 empty entries: 0\n"
"Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0\n"
"Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n"
"Prefetch entries: 0 bytes: 0B\n"
"Alloc Megaclocks 0\n"
"Allocated pages: 0 cached pages: 0\n"
Expand All @@ -945,13 +970,54 @@ TEST_P(AsyncDataCacheTest, cacheStats) {
"Cache size: 0B tinySize: 0B large size: 0B\n"
"Cache entries: 0 read pins: 0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n"
" num write wait: 0 empty entries: 0\n"
"Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0\n"
"Cache access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 aged out: 0 stales: 0\n"
"Prefetch entries: 0 bytes: 0B\n"
"Alloc Megaclocks 0\n"
"Allocated pages: 0 cached pages: 0\n";
ASSERT_EQ(cache_->toString(false), expectedShortCacheOutput);
}

TEST_P(AsyncDataCacheTest, staleEntry) {
constexpr uint64_t kRamBytes = 1UL << 30;
// Disable SSD cache to test in-memory cache stale entry only.
initializeCache(kRamBytes, 0, 0);
StringIdLease file(fileIds(), std::string_view("staleEntry"));
const uint64_t offset = 1000;
const uint64_t size = 200;
folly::SemiFuture<bool> wait(false);
RawFileCacheKey key{file.id(), offset};
auto pin = cache_->findOrCreate(key, size, &wait);
ASSERT_FALSE(pin.empty());
ASSERT_TRUE(wait.isReady());
ASSERT_TRUE(pin.entry()->isExclusive());
pin.entry()->setExclusiveToShared();
ASSERT_FALSE(pin.entry()->isExclusive());
auto stats = cache_->refreshStats();
ASSERT_EQ(stats.numStales, 0);
ASSERT_EQ(stats.numEntries, 1);
ASSERT_EQ(stats.numHit, 0);

auto validPin = cache_->findOrCreate(key, size, &wait);
ASSERT_FALSE(validPin.empty());
ASSERT_TRUE(wait.isReady());
ASSERT_FALSE(validPin.entry()->isExclusive());
stats = cache_->refreshStats();
ASSERT_EQ(stats.numStales, 0);
ASSERT_EQ(stats.numEntries, 1);
ASSERT_EQ(stats.numHit, 1);

// Stale cache access with large cache size.
auto stalePin = cache_->findOrCreate(key, 2 * size, &wait);
ASSERT_FALSE(stalePin.empty());
ASSERT_TRUE(wait.isReady());
ASSERT_TRUE(stalePin.entry()->isExclusive());
stalePin.entry()->setExclusiveToShared();
stats = cache_->refreshStats();
ASSERT_EQ(stats.numStales, 1);
ASSERT_EQ(stats.numEntries, 1);
ASSERT_EQ(stats.numHit, 1);
}

TEST_P(AsyncDataCacheTest, shrinkCache) {
constexpr uint64_t kRamBytes = 128UL << 20;
constexpr uint64_t kSsdBytes = 512UL << 20;
Expand Down
46 changes: 46 additions & 0 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,52 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) {
}
}

TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;

// Initialize cache with checksum read/write enabled.
initializeCache(kSsdSize, 0, true, true);

// Test with one SSD cache entry only.
auto pins = makePins(fileName_.id(), 0, 4096, 4096, 4096);
ssdFile_->write(pins);
ASSERT_EQ(pins.size(), 1);
pins.back().entry()->setExclusiveToShared();
auto stats = ssdFile_->testingStats();
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);

std::vector<TestEntry> entries;
for (auto& pin : pins) {
ASSERT_EQ(ssdFile_.get(), pin.entry()->ssdFile());
entries.emplace_back(
pin.entry()->key(), pin.entry()->ssdOffset(), pin.entry()->size());
};
std::vector<TestEntry> shortEntries;
for (auto& pin : pins) {
ASSERT_EQ(ssdFile_.get(), pin.entry()->ssdFile());
shortEntries.emplace_back(
pin.entry()->key(), pin.entry()->ssdOffset(), pin.entry()->size() / 2);
};

pins.clear();
cache_->testingClear();
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

ASSERT_EQ(checkEntries(entries), entries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);

cache_->testingClear();
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

#ifndef NDEBUG
VELOX_ASSERT_THROW(checkEntries(shortEntries), "");
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);
#else
ASSERT_EQ(checkEntries(shortEntries), shortEntries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 1);
#endif
}

#ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG
TEST_F(SsdFileTest, disabledCow) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ TEST_P(MemoryPoolTest, memoryCapExceptions) {
"0 write pins: 0 pinned shared: 0B pinned exclusive: 0B\n "
"num write wait: 0 empty entries: 0\nCache access miss: 0 "
"hit: 0 hit bytes: 0B eviction: 0 eviction checks: 0 "
"aged out: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks 0\n"
"aged out: 0 stales: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks 0\n"
"Allocated pages: 0 cached pages: 0\n",
isLeafThreadSafe_ ? "thread-safe" : "non-thread-safe"),
ex.message());
Expand Down Expand Up @@ -778,7 +778,7 @@ TEST_P(MemoryPoolTest, memoryCapExceptions) {
"read pins: 0 write pins: 0 pinned shared: 0B pinned "
"exclusive: 0B\n num write wait: 0 empty entries: 0\nCache "
"access miss: 0 hit: 0 hit bytes: 0B eviction: 0 eviction "
"checks: 0 aged out: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks"
"checks: 0 aged out: 0 stales: 0\nPrefetch entries: 0 bytes: 0B\nAlloc Megaclocks"
" 0\nAllocated pages: 0 cached pages: 0\n",
isLeafThreadSafe_ ? "thread-safe" : "non-thread-safe"),
ex.message());
Expand Down
Loading

0 comments on commit 97cdc63

Please sign in to comment.