Skip to content

Commit

Permalink
Use mutex lock to protect cache shutdown (#9770)
Browse files Browse the repository at this point in the history
Summary:
Currently, SSD cache use atomics to handle shutdown and writeInProgress counter which are used a lock to prevent concurrent writes to a SSD file. It is better to use mutex to ease the implementation and also prevent any write to SSD file if shutdown has been triggered.

Pull Request resolved: #9770

Reviewed By: xiaoxmeng

Differential Revision: D57220497

Pulled By: zacw7

fbshipit-source-id: caac90c041ff095678feb30df129b6c0f2fda897
zacw7 authored and facebook-github-bot committed May 14, 2024
1 parent 40661a0 commit 54db952
Showing 8 changed files with 158 additions and 64 deletions.
3 changes: 3 additions & 0 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
@@ -647,6 +647,9 @@ void AsyncDataCache::shutdown() {
for (auto& shard : shards_) {
shard->shutdown();
}
if (ssdCache_) {
ssdCache_->shutdown();
}
}

void CacheShard::shutdown() {
55 changes: 35 additions & 20 deletions velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
@@ -72,21 +72,19 @@ SsdFile& SsdCache::file(uint64_t fileId) {
}

bool SsdCache::startWrite() {
if (isShutdown_) {
return false;
}
if (writesInProgress_.fetch_add(numShards_) == 0) {
std::lock_guard<std::mutex> l(mutex_);
checkNotShutdownLocked();
if (writesInProgress_ == 0) {
// No write was pending, so now all shards are counted as writing.
writesInProgress_ += numShards_;
return true;
}
// There were writes in progress, so compensate for the increment.
writesInProgress_.fetch_sub(numShards_);
VELOX_CHECK_GE(writesInProgress_, 0);
return false;
}

void SsdCache::write(std::vector<CachePin> pins) {
VELOX_CHECK_GE(numShards_, writesInProgress_);
VELOX_CHECK_EQ(numShards_, writesInProgress_);

TestValue::adjust("facebook::velox::cache::SsdCache::write", this);

@@ -159,7 +157,6 @@ bool SsdCache::removeFileEntries(
}
--writesInProgress_;
}

return success;
}

@@ -171,12 +168,6 @@ SsdCacheStats SsdCache::stats() const {
return stats;
}

void SsdCache::testingClear() {
for (auto& file : files_) {
file->testingClear();
}
}

std::string SsdCache::toString() const {
const auto data = stats();
const uint64_t capacity = maxBytes();
@@ -189,20 +180,44 @@ std::string SsdCache::toString() const {
return out.str();
}

void SsdCache::testingDeleteFiles() {
for (auto& file : files_) {
file->deleteFile();
void SsdCache::shutdown() {
{
std::lock_guard<std::mutex> l(mutex_);
if (shutdown_) {
VELOX_SSD_CACHE_LOG(INFO) << "SSD cache has already been shutdown";
}
shutdown_ = true;
}
}

void SsdCache::shutdown() {
isShutdown_ = true;
VELOX_SSD_CACHE_LOG(INFO) << "SSD cache is shutting down";
while (writesInProgress_) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
}
for (auto& file : files_) {
file->checkpoint(true);
}
VELOX_SSD_CACHE_LOG(INFO) << "SSD cache has been shutdown";
}

void SsdCache::testingClear() {
for (auto& file : files_) {
file->testingClear();
}
}

void SsdCache::testingDeleteFiles() {
for (auto& file : files_) {
file->testingDeleteFile();
}
}

uint64_t SsdCache::testingTotalLogEvictionFilesSize() {
uint64_t size = 0;
for (auto& file : files_) {
std::filesystem::path p{file->getEvictLogFilePath()};
size += std::filesystem::file_size(p);
}
return size;
}

} // namespace facebook::velox::cache
27 changes: 18 additions & 9 deletions velox/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
@@ -87,14 +87,6 @@ class SsdCache {
return *groupStats_;
}

/// Drops all entries. Outstanding pins become invalid but reading them will
/// mostly succeed since the files will not be rewritten until new content is
/// stored.
void testingClear();

/// Deletes backing files. Used in testing.
void testingDeleteFiles();

/// Stops writing to the cache files and waits for pending writes to finish.
/// If checkpointing is on, makes a checkpoint.
void shutdown();
@@ -105,18 +97,35 @@ class SsdCache {
return filePrefix_;
}

/// Drops all entries. Outstanding pins become invalid but reading them will
/// mostly succeed since the files will not be rewritten until new content is
/// stored.
void testingClear();

/// Deletes backing files. Used in testing.
void testingDeleteFiles();

/// Returns the total size of eviction log files. Used in testing.
uint64_t testingTotalLogEvictionFilesSize();

private:
void checkNotShutdownLocked() {
VELOX_CHECK(
!shutdown_, "Unexpected write after SSD cache has been shutdown");
}

const std::string filePrefix_;
const int32_t numShards_;
// Stats for selecting entries to save from AsyncDataCache.
const std::unique_ptr<FileGroupStats> groupStats_;
folly::Executor* const executor_;
mutable std::mutex mutex_;

std::vector<std::unique_ptr<SsdFile>> files_;

// Count of shards with unfinished writes.
std::atomic_int32_t writesInProgress_{0};
std::atomic_bool isShutdown_{false};
bool shutdown_{false};
};

} // namespace facebook::velox::cache
25 changes: 12 additions & 13 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ SsdFile::SsdFile(
regionSizes_.resize(maxRegions_, 0);
erasedRegionSizes_.resize(maxRegions_, 0);
regionPins_.resize(maxRegions_, 0);
if (checkpointIntervalBytes_ > 0) {
if (checkpointEnabled()) {
initializeCheckpoint();
}
}
@@ -432,8 +432,7 @@ void SsdFile::write(std::vector<CachePin>& pins) {
storeIndex += numWritten;
}

if ((checkpointIntervalBytes_ > 0) &&
(bytesAfterCheckpoint_ >= checkpointIntervalBytes_)) {
if (checkpointEnabled()) {
checkpoint();
}
}
@@ -520,8 +519,8 @@ void SsdFile::testingClear() {
tracker_.testingClear();
}

void SsdFile::deleteFile() {
process::TraceContext trace("SsdFile::deleteFile");
void SsdFile::testingDeleteFile() {
process::TraceContext trace("SsdFile::testingDeleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
@@ -605,7 +604,7 @@ bool SsdFile::removeFileEntries(
}

void SsdFile::logEviction(const std::vector<int32_t>& regions) {
if (checkpointIntervalBytes_ > 0) {
if (checkpointEnabled()) {
const int32_t rc = ::write(
evictLogFd_, regions.data(), regions.size() * sizeof(regions[0]));
if (rc != regions.size() * sizeof(regions[0])) {
@@ -630,12 +629,12 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
}

checkpointDeleted_ = true;
const auto logPath = fileName_ + kLogExtension;
const auto logPath = getEvictLogFilePath();
int32_t logRc = 0;
if (!keepLog) {
logRc = ::unlink(logPath.c_str());
}
const auto checkpointPath = fileName_ + kCheckpointExtension;
const auto checkpointPath = getCheckpointFilePath();
const auto checkpointRc = ::unlink(checkpointPath.c_str());
if ((logRc != 0) || (checkpointRc != 0)) {
++stats_.deleteCheckpointErrors;
@@ -668,7 +667,7 @@ inline const char* asChar(const T* ptr) {
void SsdFile::checkpoint(bool force) {
process::TraceContext trace("SsdFile::checkpoint");
std::lock_guard<std::shared_mutex> l(mutex_);
if (!force && (bytesAfterCheckpoint_ < checkpointIntervalBytes_)) {
if (!needCheckpoint(force)) {
return;
}

@@ -698,7 +697,7 @@ void SsdFile::checkpoint(bool force) {
}

std::ofstream state;
const auto checkpointPath = fileName_ + kCheckpointExtension;
const auto checkpointPath = getCheckpointFilePath();
try {
state.exceptions(std::ofstream::failbit);
state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc);
@@ -789,19 +788,19 @@ void SsdFile::checkpoint(bool force) {
}

void SsdFile::initializeCheckpoint() {
if (checkpointIntervalBytes_ == 0) {
if (!checkpointEnabled()) {
return;
}

bool hasCheckpoint = true;
std::ifstream state(fileName_ + kCheckpointExtension);
std::ifstream state(getCheckpointFilePath());
if (!state.is_open()) {
hasCheckpoint = false;
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(INFO)
<< "Starting shard " << shardId_ << " without checkpoint";
}
const auto logPath = fileName_ + kLogExtension;
const auto logPath = getEvictLogFilePath();
evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (disableFileCow_) {
disableCow(evictLogFd_);
43 changes: 33 additions & 10 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
@@ -278,12 +278,6 @@ class SsdFile {
// Adds 'stats_' to 'stats'.
void updateStats(SsdCacheStats& stats) const;

/// Resets this' to a post-construction empty state. See SsdCache::clear().
void testingClear();

// Deletes the backing file. Used in testing.
void deleteFile();

/// Remove cached entries of files in the fileNum set 'filesToRemove'. If
/// successful, return true, and 'filesRetained' contains entries that should
/// not be removed, ex., from pinned regions. Otherwise, return false and
@@ -298,14 +292,25 @@ class SsdFile {
// written since last checkpoint and silently returns if not.
void checkpoint(bool force = false);

/// Returns true if copy on write is disabled for this file. Used in testing.
bool testingIsCowDisabled() const;

/// Return the SSD file path.
/// Returns the SSD file path.
const std::string& fileName() const {
return fileName_;
}

/// Returns the eviction log file path.
std::string getEvictLogFilePath() const {
return fileName_ + kLogExtension;
}

/// Deletes the backing file. Used in testing.
void testingDeleteFile();

/// Resets this' to a post-construction empty state. See SsdCache::clear().
void testingClear();

/// Returns true if copy on write is disabled for this file. Used in testing.
bool testingIsCowDisabled() const;

std::vector<double> testingCopyScores() {
return tracker_.copyScores();
}
@@ -378,6 +383,24 @@ class SsdFile {
// existing checkpoint.
void logEviction(const std::vector<int32_t>& regions);

// Returns true if checkpoint has been enabled.
bool checkpointEnabled() const {
return checkpointIntervalBytes_ > 0;
}

// Returns true if checkpoint is needed.
bool needCheckpoint(bool force) const {
if (!checkpointEnabled()) {
return false;
}
return force || (bytesAfterCheckpoint_ >= checkpointIntervalBytes_);
}

// Returns the checkpoint file path.
std::string getCheckpointFilePath() const {
return fileName_ + kCheckpointExtension;
}

static constexpr const char* kLogExtension = ".log";
static constexpr const char* kCheckpointExtension = ".cpt";

Loading

0 comments on commit 54db952

Please sign in to comment.