Skip to content

Commit

Permalink
feat: Test restart in cache fuzzer (facebookincubator#11866)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#11866

Differential Revision: D67256753
  • Loading branch information
zacw7 authored and facebook-github-bot committed Dec 15, 2024
1 parent 12942c1 commit c73d53a
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 61 deletions.
2 changes: 2 additions & 0 deletions velox/docs/develop/testing/cache-fuzzer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ Here is a full list of supported command line arguments.
to 64MB will be used, inclusively. Checkpoint after every written into each
file. 0 means no checkpointing.

* ``–-num_restarts``: Number of cache restarts in one iteration.

If running from CLion IDE, add ``--logtostderr=1`` to see the full output.
198 changes: 137 additions & 61 deletions velox/exec/fuzzer/CacheFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ DEFINE_int64(
"each file. 0 means no checkpointing. When set to -1, 1 out of 4 times checkpoint will be disabled, "
"while the other times, a random value from 32MB to 64MB will be used, inclusively.");

DEFINE_int32(num_restarts, 3, "Number of cache restarts in one iteration.");

using namespace facebook::velox::cache;
using namespace facebook::velox::dwio::common;

Expand All @@ -98,6 +100,8 @@ class CacheFuzzer {
seed(rng_());
}

void initSourceDataFiles();

int32_t getSourceFileBytes() {
if (FLAGS_source_file_bytes == kRandomized) {
return boost::random::uniform_int_distribution<int64_t>(
Expand All @@ -106,55 +110,27 @@ class CacheFuzzer {
return FLAGS_source_file_bytes;
}

int64_t getMemoryCacheBytes() {
if (FLAGS_memory_cache_bytes == kRandomized) {
return boost::random::uniform_int_distribution<int64_t>(
48 << 20 /*48MB*/, 64 << 20 /*64MB*/)(rng_);
}
return FLAGS_memory_cache_bytes;
}
void initializeCache(bool useLastConfig = false);

int32_t getSsdCacheBytes() {
if (FLAGS_ssd_cache_bytes == kRandomized) {
// Enable SSD cache 90% of the time.
return folly::Random::oneIn(10, rng_)
? 0
: boost::random::uniform_int_distribution<int64_t>(
128 << 20 /*128MB*/, 256 << 20 /*256MB*/)(rng_);
}
return FLAGS_ssd_cache_bytes;
}
int64_t getMemoryCacheBytes(bool useLastConfig = false);

int32_t getSsdCacheShards() {
if (FLAGS_num_ssd_cache_shards == kRandomized) {
// Use 1-4 shards to test different cases. The number of shards shouldn't
// be too larger so that each shard has enough space to hold large cache
// entries.
return boost::random::uniform_int_distribution<int32_t>(1, 4)(rng_);
}
return FLAGS_num_ssd_cache_shards;
}
int32_t getSsdCacheBytes(bool useLastConfig = false);

int32_t getSsdCheckpointIntervalBytes() {
if (FLAGS_ssd_checkpoint_interval_bytes == kRandomized) {
// Enable checkpoint 75% of the time as checksum depends on it.
return folly::Random::oneIn(4, rng_)
? 0
: boost::random::uniform_int_distribution<uint64_t>(
32 << 20 /*32MB*/, 64 << 20 /*64MB*/)(rng_);
}
return FLAGS_ssd_checkpoint_interval_bytes;
}
int32_t getSsdCacheShards(bool useLastConfig = false);

void initSourceDataFiles();
int32_t getSsdCheckpointIntervalBytes(bool useLastConfig = false);

bool enableChecksum(bool useLastConfig = false);

void initializeCache();
bool enableChecksumReadVerification(bool useLastConfig = false);

void initializeInputs();

void readCache();

void reset();
void resetCache();

void resetSourceDataFiles();

void read(uint32_t fileIdx, int32_t fragmentIdx);

Expand All @@ -176,6 +152,14 @@ class CacheFuzzer {
std::unique_ptr<memory::MemoryManager> memoryManager_;
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
std::shared_ptr<AsyncDataCache> cache_;
// Save the config for the last iteration so they can be potentially reused
// after restart.
int64_t lastMemoryCacheBytes_;
uint64_t lastSsdCacheBytes_;
int32_t lastNumSsdCacheShards_;
uint64_t lastSsdCheckpointIntervalBytes_;
bool lastEnableChecksum_;
bool lastEnableChecksumReadVerification_;
};

template <typename T>
Expand Down Expand Up @@ -229,7 +213,80 @@ void CacheFuzzer::initSourceDataFiles() {
}
}

void CacheFuzzer::initializeCache() {
int64_t CacheFuzzer::getMemoryCacheBytes(bool useLastConfig) {
if (!useLastConfig) {
if (FLAGS_memory_cache_bytes == kRandomized) {
lastMemoryCacheBytes_ = boost::random::uniform_int_distribution<int64_t>(
48 << 20 /*48MB*/, 64 << 20 /*64MB*/)(rng_);
return lastMemoryCacheBytes_;
} else {
lastMemoryCacheBytes_ = FLAGS_memory_cache_bytes;
}
}
return lastMemoryCacheBytes_;
}

int32_t CacheFuzzer::getSsdCacheBytes(bool useLastConfig) {
if (!useLastConfig) {
if (FLAGS_ssd_cache_bytes == kRandomized) {
// Enable SSD cache 90% of the time.
lastSsdCacheBytes_ = folly::Random::oneIn(10, rng_)
? 0
: boost::random::uniform_int_distribution<int64_t>(
128 << 20 /*128MB*/, 256 << 20 /*256MB*/)(rng_);
} else {
lastSsdCacheBytes_ = FLAGS_ssd_cache_bytes;
}
}
return lastSsdCacheBytes_;
}

int32_t CacheFuzzer::getSsdCacheShards(bool useLastConfig) {
if (!useLastConfig) {
if (FLAGS_num_ssd_cache_shards == kRandomized) {
// Use 1-4 shards to test different cases. The number of shards shouldn't
// be too larger so that each shard has enough space to hold large cache
// entries.
lastNumSsdCacheShards_ =
boost::random::uniform_int_distribution<int32_t>(1, 4)(rng_);
} else {
lastNumSsdCacheShards_ = FLAGS_num_ssd_cache_shards;
}
}

return lastNumSsdCacheShards_;
}

int32_t CacheFuzzer::getSsdCheckpointIntervalBytes(bool useLastConfig) {
if (!useLastConfig) {
if (FLAGS_ssd_checkpoint_interval_bytes == kRandomized) {
// Enable checkpoint 75% of the time as checksum depends on it.
lastSsdCheckpointIntervalBytes_ = folly::Random::oneIn(4, rng_)
? 0
: boost::random::uniform_int_distribution<uint64_t>(
32 << 20 /*32MB*/, 64 << 20 /*64MB*/)(rng_);
} else {
lastSsdCheckpointIntervalBytes_ = FLAGS_ssd_checkpoint_interval_bytes;
}
}
return lastSsdCheckpointIntervalBytes_;
}

bool CacheFuzzer::enableChecksum(bool useLastConfig) {
if (!useLastConfig) {
lastEnableChecksum_ = folly::Random::oneIn(2, rng_);
}
return lastEnableChecksum_;
}

bool CacheFuzzer::enableChecksumReadVerification(bool useLastConfig) {
if (!useLastConfig) {
lastEnableChecksum_ = folly::Random::oneIn(2, rng_);
}
return lastEnableChecksum_;
}

void CacheFuzzer::initializeCache(bool useLastConfig) {
// We have up to 20 threads and 16 threads are used for reading so
// there are some threads left over for SSD background write.
executor_ = std::make_unique<folly::IOThreadPoolExecutor>(20);
Expand All @@ -238,10 +295,12 @@ void CacheFuzzer::initializeCache() {

std::unique_ptr<SsdCache> ssdCache;
if (ssdCacheBytes > 0) {
const auto numSsdCacheShards = getSsdCacheShards();
const auto checkpointIntervalBytes = getSsdCheckpointIntervalBytes();
const auto enableChecksum = folly::Random::oneIn(2, rng_);
const auto enableChecksumReadVerification = folly::Random::oneIn(2, rng_);
const auto numSsdCacheShards = getSsdCacheShards(useLastConfig);
const auto checkpointIntervalBytes =
getSsdCheckpointIntervalBytes(useLastConfig);
const auto enableChecksum_ = enableChecksum(useLastConfig);
const auto enableChecksumReadVerification_ =
enableChecksumReadVerification(useLastConfig);

SsdCache::Config config(
fmt::format("{}/cache", sourceDataDir_->getPath()),
Expand All @@ -250,8 +309,8 @@ void CacheFuzzer::initializeCache() {
executor_.get(),
checkpointIntervalBytes,
false,
enableChecksum,
enableChecksumReadVerification);
enableChecksum_,
enableChecksumReadVerification_);
ssdCache = std::make_unique<SsdCache>(config);
LOG(INFO) << fmt::format(
"Initialized SSD cache with {} shards, {}, with checkpoint {}, checksum write {}, read verification {}",
Expand All @@ -260,8 +319,8 @@ void CacheFuzzer::initializeCache() {
checkpointIntervalBytes > 0
? fmt::format("enabled({})", succinctBytes(checkpointIntervalBytes))
: "disabled",
enableChecksum ? "enabled" : "disabled",
enableChecksumReadVerification ? "enabled" : "disabled");
enableChecksum_ ? "enabled" : "disabled",
enableChecksumReadVerification_ ? "enabled" : "disabled");
}

memory::MemoryManagerOptions options;
Expand Down Expand Up @@ -342,22 +401,29 @@ void CacheFuzzer::readCache() {
}
}

void CacheFuzzer::reset() {
void CacheFuzzer::resetCache() {
cache_->shutdown();
if (cache_->ssdCache() != nullptr) {
cache_->ssdCache()->waitForWriteToFinish();
}
executor_->join();
executor_.reset();
fileNames_.clear();
fileIds_.clear();
fileSizes_.clear();
fileFragments_.clear();
inputs_.clear();
fs_.reset();
cache_.reset();
memoryManager_.reset();
}

void CacheFuzzer::resetSourceDataFiles() {
const auto& sourceDataDirPath = sourceDataDir_->getPath();
if (fs_->exists(sourceDataDirPath)) {
fs_->rmdir(sourceDataDirPath);
}
fs_.reset();
sourceDataDir_.reset();
fileNames_.clear();
fileIds_.clear();
fileSizes_.clear();
fileIds().testingReset();
}

Expand Down Expand Up @@ -406,18 +472,28 @@ void CacheFuzzer::go() {
<< iteration << " (seed: " << currentSeed_ << ")";

initSourceDataFiles();
SCOPE_EXIT {
resetSourceDataFiles();
};

for (int i = 0; i <= FLAGS_num_restarts; ++i) {
if (i > 0) {
const auto useLastConfig = !folly::Random::oneIn(3, rng_);
LOG(INFO) << "Restarting cache with "
<< (useLastConfig ? "last" : "new") << " config";
initializeCache(useLastConfig);
} else {
initializeCache();
}

initializeCache();

initializeInputs();

readCache();
initializeInputs();

// TODO: Test cache restart.
readCache();

LOG(INFO) << cache_->refreshStats().toString();
LOG(INFO) << cache_->refreshStats().toString();

reset();
resetCache();
}

LOG(INFO) << "==============================> Done with iteration "
<< iteration;
Expand Down

0 comments on commit c73d53a

Please sign in to comment.