Skip to content

Commit

Permalink
feat: Use Velox fs for ssd cache evictlog file (#11579)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11579

Switch the eviction log file to use Velox filesystem for file r/w operations, so that more advanced testing can be built by leveraging features like fault injections.

This change is a resubmission for D65740612 which was wrongfully reverted in D66119405.

Reviewed By: xiaoxmeng

Differential Revision: D66140029

fbshipit-source-id: bae8826d5ef642933684c975bab000cb453ee1a5
  • Loading branch information
zacw7 authored and facebook-github-bot committed Nov 19, 2024
1 parent 7d0b84e commit 03deeaf
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 80 deletions.
118 changes: 55 additions & 63 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD");
namespace facebook::velox::cache {

namespace {

// TODO: Remove this function once we migrate all files to velox fs.
//
// Disable 'copy on write' on the given file. Will throw if failed for any
// reason, including file system not supporting cow feature.
void disableCow(int32_t fd) {
Expand All @@ -66,28 +69,6 @@ void disableCow(int32_t fd) {
#endif // linux
}

// TODO: Remove this function once we migrate all files to velox fs.
void disableFileCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -354,7 +335,7 @@ bool SsdFile::growOrEvictLocked() {
}
}

const auto candidates =
auto candidates =
tracker_.findEvictionCandidates(3, numRegions_, regionPins_);
if (candidates.empty()) {
suspended_ = true;
Expand Down Expand Up @@ -676,44 +657,49 @@ bool SsdFile::removeFileEntries(
return true;
}

void SsdFile::logEviction(const std::vector<int32_t>& regions) {
if (checkpointEnabled()) {
const int32_t rc = ::write(
evictLogFd_, regions.data(), regions.size() * sizeof(regions[0]));
if (rc != regions.size() * sizeof(regions[0])) {
checkpointError(rc, "Failed to log eviction");
}
void SsdFile::logEviction(std::vector<int32_t>& regions) {
if (!checkpointEnabled()) {
return;
}
const auto length = regions.size() * sizeof(regions[0]);
const std::vector<iovec> iovecs = {{regions.data(), length}};
try {
evictLogWriteFile_->write(iovecs, 0, static_cast<int64_t>(length));
} catch (const std::exception& e) {
++stats_.writeSsdErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Failed to log eviction: " << e.what();
}
}

void SsdFile::deleteCheckpoint(bool keepLog) {
if (checkpointDeleted_) {
return;
}
if (evictLogFd_ >= 0) {
if (keepLog) {
::lseek(evictLogFd_, 0, SEEK_SET);
::ftruncate(evictLogFd_, 0);
::fsync(evictLogFd_);
} else {
::close(evictLogFd_);
evictLogFd_ = -1;

if (evictLogWriteFile_ != nullptr) {
try {
if (keepLog) {
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();
} else {
evictLogWriteFile_->close();
fs_->remove(getEvictLogFilePath());
evictLogWriteFile_.reset();
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what();
}
}

checkpointDeleted_ = true;
const auto logPath = getEvictLogFilePath();
int32_t logRc = 0;
if (!keepLog) {
logRc = ::unlink(logPath.c_str());
}
const auto checkpointPath = getCheckpointFilePath();
const auto checkpointRc = ::unlink(checkpointPath.c_str());
if ((logRc != 0) || (checkpointRc != 0)) {
++stats_.deleteCheckpointErrors;
if (checkpointRc != 0) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error in deleting log and checkpoint. log: " << logRc
<< " checkpoint: " << checkpointRc;
<< "Error in deleting checkpoint: " << checkpointRc;
}
if (checkpointRc != 0) {
++stats_.deleteCheckpointErrors;
}
}

Expand Down Expand Up @@ -851,8 +837,9 @@ void SsdFile::checkpoint(bool force) {
// NOTE: we shall truncate eviction log after checkpoint file sync
// completes so that we never recover from an old checkpoint file without
// log evictions. The latter might lead to data consistent issue.
checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log");
checkRc(::fsync(evictLogFd_), "Sync of evict log");
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();

VELOX_SSD_CACHE_LOG(INFO)
<< "Checkpoint persisted with " << entries_.size() << " cache entries";
Expand Down Expand Up @@ -883,18 +870,14 @@ void SsdFile::initializeCheckpoint() {
getCheckpointFilePath());
}
const auto logPath = getEvictLogFilePath();
evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (disableFileCow_) {
disableCow(evictLogFd_);
}
if (evictLogFd_ < 0) {
filesystems::FileOptions evictLogFileOptions;
evictLogFileOptions.shouldThrowOnFileAlreadyExists = false;
try {
evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions);
} catch (std::exception& e) {
++stats_.openLogErrors;
// Failure to open the log at startup is a process terminating error.
VELOX_FAIL(
"Could not open evict log {}, rc {}: {}",
logPath,
evictLogFd_,
folly::errnoStr(errno));
VELOX_FAIL("Could not open evict log {}: {}", logPath, e.what());
}

try {
Expand Down Expand Up @@ -965,6 +948,9 @@ void SsdFile::disableFileCow() {
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeFile_->setAttributes(attributes);
if (evictLogWriteFile_ != nullptr) {
evictLogWriteFile_->setAttributes(attributes);
}
#endif // linux
}

Expand Down Expand Up @@ -1021,10 +1007,16 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
idMap[id] = StringIdLease(fileIds(), id, name);
}

const auto logSize = ::lseek(evictLogFd_, 0, SEEK_END);
const auto logPath = getEvictLogFilePath();
const auto evictLogReadFile = fs_->openFileForRead(logPath);
const auto logSize = evictLogReadFile->size();
std::vector<uint32_t> evicted(logSize / sizeof(uint32_t));
const auto rc = ::pread(evictLogFd_, evicted.data(), logSize, 0);
VELOX_CHECK_EQ(logSize, rc, "Failed to read eviction log");
try {
evictLogReadFile->pread(0, logSize, evicted.data());
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
VELOX_FAIL("Failed to read eviction log: {}", e.what());
}
std::unordered_set<uint32_t> evictedMap;
for (auto region : evicted) {
evictedMap.insert(region);
Expand Down
16 changes: 11 additions & 5 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,13 @@ class SsdFile {

/// Returns the checkpoint file path.
std::string getCheckpointFilePath() const {
return fileName_ + kCheckpointExtension;
// Faulty file path needs to be handled manually before we switch checkpoint
// file to Velox filesystem.
const std::string faultyPrefix = "faulty:";
std::string checkpointPath = fileName_ + kCheckpointExtension;
return checkpointPath.find(faultyPrefix) == 0
? checkpointPath.substr(faultyPrefix.size())
: checkpointPath;
}

/// Deletes the backing file. Used in testing.
Expand Down Expand Up @@ -477,7 +483,7 @@ class SsdFile {

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
void logEviction(const std::vector<int32_t>& regions);
void logEviction(std::vector<int32_t>& regions);

// Computes the checksum of data in cache 'entry'.
uint32_t checksumEntry(const AsyncDataCacheEntry& entry) const;
Expand Down Expand Up @@ -572,6 +578,9 @@ class SsdFile {
// WriteFile for cache data file.
std::unique_ptr<WriteFile> writeFile_;

// WriteFile for evict log file.
std::unique_ptr<WriteFile> evictLogWriteFile_;

// Counters.
SsdCacheStats stats_;

Expand All @@ -585,9 +594,6 @@ class SsdFile {
// Count of bytes written after last checkpoint.
std::atomic<uint64_t> bytesAfterCheckpoint_{0};

// fd for logging evictions.
int32_t evictLogFd_{-1};

// True if there was an error with checkpoint and the checkpoint was deleted.
bool checkpointDeleted_{false};
};
Expand Down
1 change: 1 addition & 0 deletions velox/common/caching/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ target_link_libraries(
PRIVATE
velox_caching
velox_file
velox_file_test_utils
velox_memory
velox_temp_path
Folly::folly
Expand Down
Loading

0 comments on commit 03deeaf

Please sign in to comment.