Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Test restart in cache fuzzer #11866

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/docs/develop/testing/cache-fuzzer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ Here is a full list of supported command line arguments.
to 64MB will be used, inclusively. Checkpoint after every written into each
file. 0 means no checkpointing.

* ``–-num_restarts``: Number of cache restarts in one iteration.

If running from CLion IDE, add ``--logtostderr=1`` to see the full output.
197 changes: 136 additions & 61 deletions velox/exec/fuzzer/CacheFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ DEFINE_int64(
"each file. 0 means no checkpointing. When set to -1, 1 out of 4 times checkpoint will be disabled, "
"while the other times, a random value from 32MB to 64MB will be used, inclusively.");

DEFINE_int32(num_restarts, 3, "Number of cache restarts in one iteration.");

using namespace facebook::velox::cache;
using namespace facebook::velox::dwio::common;

Expand All @@ -98,6 +100,8 @@ class CacheFuzzer {
seed(rng_());
}

void initSourceDataFiles();

int32_t getSourceFileBytes() {
if (FLAGS_source_file_bytes == kRandomized) {
return boost::random::uniform_int_distribution<int64_t>(
Expand All @@ -106,55 +110,27 @@ class CacheFuzzer {
return FLAGS_source_file_bytes;
}

int64_t getMemoryCacheBytes() {
if (FLAGS_memory_cache_bytes == kRandomized) {
return boost::random::uniform_int_distribution<int64_t>(
48 << 20 /*48MB*/, 64 << 20 /*64MB*/)(rng_);
}
return FLAGS_memory_cache_bytes;
}
void initializeCache(bool restartCache = false);

int32_t getSsdCacheBytes() {
if (FLAGS_ssd_cache_bytes == kRandomized) {
// Enable SSD cache 90% of the time.
return folly::Random::oneIn(10, rng_)
? 0
: boost::random::uniform_int_distribution<int64_t>(
128 << 20 /*128MB*/, 256 << 20 /*256MB*/)(rng_);
}
return FLAGS_ssd_cache_bytes;
}
int64_t getMemoryCacheBytes(bool restartCache = false);

int32_t getSsdCacheShards() {
if (FLAGS_num_ssd_cache_shards == kRandomized) {
// Use 1-4 shards to test different cases. The number of shards shouldn't
// be too larger so that each shard has enough space to hold large cache
// entries.
return boost::random::uniform_int_distribution<int32_t>(1, 4)(rng_);
}
return FLAGS_num_ssd_cache_shards;
}
int64_t getSsdCacheBytes(bool restartCache = false);

int32_t getSsdCheckpointIntervalBytes() {
if (FLAGS_ssd_checkpoint_interval_bytes == kRandomized) {
// Enable checkpoint 75% of the time as checksum depends on it.
return folly::Random::oneIn(4, rng_)
? 0
: boost::random::uniform_int_distribution<uint64_t>(
32 << 20 /*32MB*/, 64 << 20 /*64MB*/)(rng_);
}
return FLAGS_ssd_checkpoint_interval_bytes;
}
int32_t getSsdCacheShards(bool restartCache = false);

void initSourceDataFiles();
int64_t getSsdCheckpointIntervalBytes(bool restartCache = false);

bool enableChecksum(bool restartCache = false);

void initializeCache();
bool enableChecksumReadVerification(bool restartCache = false);

void initializeInputs();

void readCache();

void reset();
void resetCache();

void resetSourceDataFiles();

void read(uint32_t fileIdx, int32_t fragmentIdx);

Expand All @@ -176,6 +152,14 @@ class CacheFuzzer {
std::unique_ptr<memory::MemoryManager> memoryManager_;
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
std::shared_ptr<AsyncDataCache> cache_;
// Save the config for the last iteration so they can be potentially reused
// after restart.
int64_t lastMemoryCacheBytes_;
int64_t lastSsdCacheBytes_;
int32_t lastNumSsdCacheShards_;
int64_t lastSsdCheckpointIntervalBytes_;
bool lastEnableChecksum_;
bool lastEnableChecksumReadVerification_;
};

template <typename T>
Expand Down Expand Up @@ -229,7 +213,79 @@ void CacheFuzzer::initSourceDataFiles() {
}
}

void CacheFuzzer::initializeCache() {
int64_t CacheFuzzer::getMemoryCacheBytes(bool restartCache) {
if (!restartCache) {
if (FLAGS_memory_cache_bytes == kRandomized) {
lastMemoryCacheBytes_ = boost::random::uniform_int_distribution<int64_t>(
48 << 20 /*48MB*/, 64 << 20 /*64MB*/)(rng_);
} else {
lastMemoryCacheBytes_ = FLAGS_memory_cache_bytes;
}
}
return lastMemoryCacheBytes_;
}

int64_t CacheFuzzer::getSsdCacheBytes(bool restartCache) {
if (!restartCache) {
if (FLAGS_ssd_cache_bytes == kRandomized) {
// Enable SSD cache 90% of the time.
lastSsdCacheBytes_ = folly::Random::oneIn(10, rng_)
? 0
: boost::random::uniform_int_distribution<int64_t>(
128 << 20 /*128MB*/, 256 << 20 /*256MB*/)(rng_);
} else {
lastSsdCacheBytes_ = FLAGS_ssd_cache_bytes;
}
}
return lastSsdCacheBytes_;
}

int32_t CacheFuzzer::getSsdCacheShards(bool restartCache) {
if (!restartCache) {
if (FLAGS_num_ssd_cache_shards == kRandomized) {
// Use 1-4 shards to test different cases. The number of shards shouldn't
// be too larger so that each shard has enough space to hold large cache
// entries.
lastNumSsdCacheShards_ =
boost::random::uniform_int_distribution<int32_t>(1, 4)(rng_);
} else {
lastNumSsdCacheShards_ = FLAGS_num_ssd_cache_shards;
}
}

return lastNumSsdCacheShards_;
}

int64_t CacheFuzzer::getSsdCheckpointIntervalBytes(bool restartCache) {
if (!restartCache) {
if (FLAGS_ssd_checkpoint_interval_bytes == kRandomized) {
// Enable checkpoint 75% of the time as checksum depends on it.
lastSsdCheckpointIntervalBytes_ = folly::Random::oneIn(4, rng_)
? 0
: boost::random::uniform_int_distribution<uint64_t>(
32 << 20 /*32MB*/, 64 << 20 /*64MB*/)(rng_);
} else {
lastSsdCheckpointIntervalBytes_ = FLAGS_ssd_checkpoint_interval_bytes;
}
}
return lastSsdCheckpointIntervalBytes_;
}

bool CacheFuzzer::enableChecksum(bool restartCache) {
if (!restartCache) {
lastEnableChecksum_ = folly::Random::oneIn(2, rng_);
}
return lastEnableChecksum_;
}

bool CacheFuzzer::enableChecksumReadVerification(bool restartCache) {
if (!restartCache) {
lastEnableChecksum_ = folly::Random::oneIn(2, rng_);
}
return lastEnableChecksum_;
}

void CacheFuzzer::initializeCache(bool restartCache) {
// We have up to 20 threads and 16 threads are used for reading so
// there are some threads left over for SSD background write.
executor_ = std::make_unique<folly::IOThreadPoolExecutor>(20);
Expand All @@ -238,10 +294,12 @@ void CacheFuzzer::initializeCache() {

std::unique_ptr<SsdCache> ssdCache;
if (ssdCacheBytes > 0) {
const auto numSsdCacheShards = getSsdCacheShards();
const auto checkpointIntervalBytes = getSsdCheckpointIntervalBytes();
const auto enableChecksum = folly::Random::oneIn(2, rng_);
const auto enableChecksumReadVerification = folly::Random::oneIn(2, rng_);
const auto numSsdCacheShards = getSsdCacheShards(restartCache);
const auto checkpointIntervalBytes =
getSsdCheckpointIntervalBytes(restartCache);
const auto enableChecksum_ = enableChecksum(restartCache);
const auto enableChecksumReadVerification_ =
enableChecksumReadVerification(restartCache);

SsdCache::Config config(
fmt::format("{}/cache", sourceDataDir_->getPath()),
Expand All @@ -250,8 +308,8 @@ void CacheFuzzer::initializeCache() {
executor_.get(),
checkpointIntervalBytes,
false,
enableChecksum,
enableChecksumReadVerification);
enableChecksum_,
enableChecksumReadVerification_);
ssdCache = std::make_unique<SsdCache>(config);
LOG(INFO) << fmt::format(
"Initialized SSD cache with {} shards, {}, with checkpoint {}, checksum write {}, read verification {}",
Expand All @@ -260,8 +318,8 @@ void CacheFuzzer::initializeCache() {
checkpointIntervalBytes > 0
? fmt::format("enabled({})", succinctBytes(checkpointIntervalBytes))
: "disabled",
enableChecksum ? "enabled" : "disabled",
enableChecksumReadVerification ? "enabled" : "disabled");
enableChecksum_ ? "enabled" : "disabled",
enableChecksumReadVerification_ ? "enabled" : "disabled");
}

memory::MemoryManagerOptions options;
Expand Down Expand Up @@ -342,22 +400,29 @@ void CacheFuzzer::readCache() {
}
}

void CacheFuzzer::reset() {
void CacheFuzzer::resetCache() {
cache_->shutdown();
if (cache_->ssdCache() != nullptr) {
cache_->ssdCache()->waitForWriteToFinish();
}
executor_->join();
executor_.reset();
fileNames_.clear();
fileIds_.clear();
fileSizes_.clear();
fileFragments_.clear();
inputs_.clear();
fs_.reset();
cache_.reset();
memoryManager_.reset();
}

void CacheFuzzer::resetSourceDataFiles() {
const auto& sourceDataDirPath = sourceDataDir_->getPath();
if (fs_->exists(sourceDataDirPath)) {
fs_->rmdir(sourceDataDirPath);
}
fs_.reset();
sourceDataDir_.reset();
fileNames_.clear();
fileIds_.clear();
fileSizes_.clear();
fileIds().testingReset();
}

Expand Down Expand Up @@ -406,18 +471,28 @@ void CacheFuzzer::go() {
<< iteration << " (seed: " << currentSeed_ << ")";

initSourceDataFiles();
SCOPE_EXIT {
resetSourceDataFiles();
};

for (int i = 0; i <= FLAGS_num_restarts; ++i) {
if (i > 0) {
const auto restartCache = !folly::Random::oneIn(3, rng_);
LOG(INFO) << "Restarting cache with " << (restartCache ? "last" : "new")
<< " config";
initializeCache(restartCache);
} else {
initializeCache();
}

initializeCache();

initializeInputs();

readCache();
initializeInputs();

// TODO: Test cache restart.
readCache();

LOG(INFO) << cache_->refreshStats().toString();
LOG(INFO) << cache_->refreshStats().toString();

reset();
resetCache();
}

LOG(INFO) << "==============================> Done with iteration "
<< iteration;
Expand Down
Loading