From 6c197bb6f792cb9bf91be0dae5c8911f7bb19f93 Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Mon, 7 Oct 2024 16:47:15 -0700 Subject: [PATCH] Use Velox filesystem in SSD cache SSD cache currently uses native functions for file operations. This needs to be switched to Velox filesystem, so more advanced testing can be built by leveraging features like fault injections. --- velox/common/caching/SsdFile.cpp | 267 ++++++++++----------- velox/common/caching/SsdFile.h | 26 +- velox/common/caching/tests/SsdFileTest.cpp | 2 + 3 files changed, 142 insertions(+), 153 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 7f500973917a1..2a462b9d2f64f 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -16,7 +16,6 @@ #include "velox/common/caching/SsdFile.h" -#include #include #include "velox/common/base/AsyncSource.h" #include "velox/common/base/Crc.h" @@ -29,9 +28,6 @@ #ifdef linux #include #endif // linux -#include -#include -#include #include #include @@ -44,29 +40,6 @@ DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD"); namespace facebook::velox::cache { namespace { -// Disable 'copy on write' on the given file. Will throw if failed for any -// reason, including file system not supporting cow feature. -void disableCow(int32_t fd) { -#ifdef linux - int attr{0}; - auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - attr |= FS_NOCOW_FL; - res = ioctl(fd, FS_IOC_SETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", - res, - folly::errnoStr(errno)); -#endif // linux -} - void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -145,28 +118,18 @@ SsdFile::SsdFile(const Config& config) #ifdef linux oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0; #endif // linux - fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR); - if (fd_ < 0) { - ++stats_.openFileErrors; - } - // TODO: add fault tolerant handling for open file errors. - VELOX_CHECK_GE( - fd_, - 0, - "Cannot open or create {}. Error: {}", - fileName_, - folly::errnoStr(errno)); - - if (disableFileCow_) { - disableCow(fd_); - } - - readFile_ = std::make_unique(fd_); - const uint64_t size = lseek(fd_, 0, SEEK_END); + fs_ = filesystems::getFileSystem(fileName_, nullptr); + filesystems::FileOptions fileOptions; + fileOptions.shouldThrowOnFileAlreadyExists = false; + fileOptions.bufferWrite = false; + writeFile_ = fs_->openFileForWrite(fileName_, fileOptions); + readFile_ = fs_->openFileForRead(fileName_); + + const uint64_t size = writeFile_->size(); numRegions_ = std::min(size / kRegionSize, maxRegions_); fileSize_ = numRegions_ * kRegionSize; if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) { - ::ftruncate(fd_, fileSize_); + writeFile_->truncate(fileSize_); } // The existing regions in the file are writable. writableRegions_.resize(numRegions_); @@ -178,6 +141,22 @@ SsdFile::SsdFile(const Config& config) if (checkpointEnabled()) { initializeCheckpoint(); } + + if (disableFileCow_) { +// Disable 'copy on write' on the given file. Will throw if failed for any +// reason, including file system not supporting cow feature. +#ifdef linux + const std::unordered_map attributes = { + {std::string(LocalWriteFile::Attributes::kNoCow), "true"}}; + writeFile_->setAttributes(attributes); + if (evictLogWriteFile_ != nullptr) { + evictLogWriteFile_->setAttributes(attributes); + } + if (checkpointWriteFile_ != nullptr) { + checkpointWriteFile_->setAttributes(attributes); + } +#endif // linux + } } void SsdFile::pinRegion(uint64_t offset) { @@ -324,23 +303,30 @@ bool SsdFile::growOrEvictLocked() { process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { const auto newSize = (numRegions_ + 1) * kRegionSize; - const auto rc = ::ftruncate(fd_, newSize); - if (rc >= 0) { + try { + writeFile_->truncate(newSize); fileSize_ = newSize; writableRegions_.push_back(numRegions_); regionSizes_[numRegions_] = 0; erasedRegionSizes_[numRegions_] = 0; ++numRegions_; + VELOX_SSD_CACHE_LOG(WARNING) + << "Grow " << fileName_ << " to " << numRegions_ + << " regions (max: " << maxRegions_ << ")"; return true; + } catch (const std::exception& e) { + ++stats_.growFileErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to grow cache file " << fileName_ << " to " << newSize + << " with error: " << e.what(); } - - ++stats_.growFileErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to grow cache file " << fileName_ << " to " << newSize; } - const auto candidates = + auto candidates = tracker_.findEvictionCandidates(3, numRegions_, regionPins_); + VELOX_SSD_CACHE_LOG(WARNING) + << "Evicting " << candidates.size() << " regions from " << fileName_ + << " (current: " << numRegions_ << ")"; if (candidates.empty()) { suspended_ = true; return false; @@ -467,20 +453,21 @@ void SsdFile::write(std::vector& pins) { } bool SsdFile::write( - uint64_t offset, - uint64_t length, + int64_t offset, + int64_t length, const std::vector& iovecs) { - const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (ret == length) { - return true; + try { + writeFile_->write(iovecs, offset, length); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", size: " << iovecs.size() << ", offset: " << offset + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + return false; } - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ - << ", size: " << iovecs.size() << ", offset: " << offset - << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - return false; + return true; } namespace { @@ -497,8 +484,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) { void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); - const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset()); - VELOX_CHECK_EQ(rc, entry.size()); + const auto rc = + readFile_->pread(ssdRun.offset(), entry.size(), testData.get()); + VELOX_CHECK_EQ(rc.size(), entry.size()); if (entry.tinyData() != nullptr) { if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) { VELOX_FAIL("bad read back"); @@ -569,9 +557,9 @@ void SsdFile::clear() { void SsdFile::testingDeleteFile() { process::TraceContext trace("SsdFile::testingDeleteFile"); - if (fd_) { - close(fd_); - fd_ = 0; + if (writeFile_) { + writeFile_->close(); + writeFile_ = nullptr; } auto rc = unlink(fileName_.c_str()); if (rc < 0) { @@ -657,12 +645,14 @@ bool SsdFile::removeFileEntries( return true; } -void SsdFile::logEviction(const std::vector& regions) { +void SsdFile::logEviction(std::vector& regions) { if (checkpointEnabled()) { - const int32_t rc = ::write( - evictLogFd_, regions.data(), regions.size() * sizeof(regions[0])); - if (rc != regions.size() * sizeof(regions[0])) { - checkpointError(rc, "Failed to log eviction"); + const auto length = regions.size() * sizeof(regions[0]); + std::vector iovecs = {{regions.data(), length}}; + try { + evictLogWriteFile_->write(iovecs, 0, length); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Failed to log eviction: " << e.what(); } } } @@ -671,34 +661,34 @@ void SsdFile::deleteCheckpoint(bool keepLog) { if (checkpointDeleted_) { return; } - if (evictLogFd_ >= 0) { - if (keepLog) { - ::lseek(evictLogFd_, 0, SEEK_SET); - ::ftruncate(evictLogFd_, 0); - ::fsync(evictLogFd_); - } else { - ::close(evictLogFd_); - evictLogFd_ = -1; + + if (evictLogWriteFile_ != nullptr) { + try { + if (keepLog) { + evictLogWriteFile_->truncate(0); + evictLogWriteFile_->flush(); + } else { + evictLogWriteFile_->close(); + fs_->remove(getEvictLogFilePath()); + evictLogWriteFile_ = nullptr; + } + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what(); } } checkpointDeleted_ = true; const auto logPath = getEvictLogFilePath(); - int32_t logRc = 0; - if (!keepLog) { - logRc = ::unlink(logPath.c_str()); - } const auto checkpointPath = getCheckpointFilePath(); - const auto checkpointRc = ::unlink(checkpointPath.c_str()); - if ((logRc != 0) || (checkpointRc != 0)) { + try { + fs_->remove(getCheckpointFilePath()); + } catch (const std::exception& e) { ++stats_.deleteCheckpointErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Error in deleting log and checkpoint. log: " << logRc - << " checkpoint: " << checkpointRc; + VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what(); } } -void SsdFile::checkpointError(int32_t rc, const std::string& error) { +void SsdFile::checkpointError(uint64_t rc, const std::string& error) { VELOX_SSD_CACHE_LOG(ERROR) << error << " with rc=" << rc << ", deleting checkpoint and continuing with checkpointing off"; @@ -734,21 +724,13 @@ void SsdFile::checkpoint(bool force) { checkpointDeleted_ = false; bytesAfterCheckpoint_ = 0; try { - const auto checkRc = [&](int32_t rc, const std::string& errMsg) { - if (rc < 0) { - VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno)); - } - return rc; - }; - // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. - auto fileSync = std::make_shared>( - [fd = fd_]() { return std::make_unique(::fsync(fd)); }); - if (executor_ != nullptr) { - executor_->add([fileSync]() { fileSync->prepare(); }); - } + auto fileSync = std::make_shared>([this]() { + writeFile_->flush(); + return std::make_unique(0); + }); std::ofstream state; const auto checkpointPath = getCheckpointFilePath(); @@ -803,39 +785,28 @@ void SsdFile::checkpoint(bool force) { // NOTE: we need to ensure cache file data sync update completes before // updating checkpoint file. - const auto fileSyncRc = fileSync->move(); - checkRc(*fileSyncRc, "Sync of cache data file"); + fileSync->move(); const auto endMarker = kCheckpointEndMarker; state.write(asChar(&endMarker), sizeof(endMarker)); if (state.bad()) { ++stats_.writeCheckpointErrors; - checkRc(-1, "Write of checkpoint file"); + VELOX_FAIL("Write of checkpoint file:{}", folly::errnoStr(errno)); } else { ++stats_.checkpointsWritten; } state.close(); - // Sync checkpoint data file. ofstream does not have a sync method, so open - // as fd and sync that. - const auto checkpointFd = checkRc( - ::open(checkpointPath.c_str(), O_WRONLY), - "Open of checkpoint file for sync"); - // TODO: add this as file open option after we migrate to use velox - // filesystem for ssd file access. - if (disableFileCow_) { - disableCow(checkpointFd); - } - VELOX_CHECK_GE(checkpointFd, 0); - checkRc(::fsync(checkpointFd), "Sync of checkpoint file"); - ::close(checkpointFd); + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + checkpointWriteFile_->flush(); // NOTE: we shall truncate eviction log after checkpoint file sync // completes so that we never recover from an old checkpoint file without // log evictions. The latter might lead to data consistent issue. - checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log"); - checkRc(::fsync(evictLogFd_), "Sync of evict log"); + VELOX_CHECK_NOT_NULL(evictLogWriteFile_); + evictLogWriteFile_->truncate(0); + evictLogWriteFile_->flush(); VELOX_SSD_CACHE_LOG(INFO) << "Checkpoint persisted with " << entries_.size() << " cache entries"; @@ -866,18 +837,29 @@ void SsdFile::initializeCheckpoint() { getCheckpointFilePath()); } const auto logPath = getEvictLogFilePath(); - evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); - if (disableFileCow_) { - disableCow(evictLogFd_); - } - if (evictLogFd_ < 0) { + filesystems::FileOptions evictLogFileOptions; + evictLogFileOptions.shouldThrowOnFileAlreadyExists = false; + try { + evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions); + } catch (std::exception& e) { ++stats_.openLogErrors; // Failure to open the log at startup is a process terminating error. + VELOX_FAIL("Could not open evict log {}: {}", logPath, e.what()); + } + + const auto checkpointPath = getCheckpointFilePath(); + filesystems::FileOptions checkpointFileOptions; + checkpointFileOptions.shouldThrowOnFileAlreadyExists = false; + try { + checkpointWriteFile_ = + fs_->openFileForWrite(checkpointPath, checkpointFileOptions); + } catch (std::exception& e) { + ++stats_.openCheckpointErrors; + // Failure to open the log at startup is a process terminating error. VELOX_FAIL( - "Could not open evict log {}, rc {}: {}", - logPath, - evictLogFd_, - folly::errnoStr(errno)); + "Could not open checkpoint file for write {}: {}", + checkpointPath, + e.what()); } try { @@ -936,8 +918,11 @@ void SsdFile::maybeVerifyChecksum( if (checksum != ssdRun.checksum()) { ++stats_.readSsdCorruptions; VELOX_FAIL( - "IOERR: Corrupt SSD cache entry - File: {}, Offset: {}, Size: {}", + "IOERR: Corrupt SSD cache entry calc {} vs ssdRun {} - File: {}, Entry: {}, Offset: {}, Size: {}", + checksum, + ssdRun.checksum(), fileName_, + entry.toString(), ssdRun.offset(), ssdRun.size()); } @@ -945,16 +930,9 @@ void SsdFile::maybeVerifyChecksum( bool SsdFile::testingIsCowDisabled() const { #ifdef linux - int attr{0}; - const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - - return (attr & FS_NOCOW_FL) == FS_NOCOW_FL; + const auto attributes = writeFile_->getAttributes(); + const auto it = attributes.find(std::string(LocalWriteFile::Attributes::kNoCow)); + return it != attributes.end() && it->second == "true"; #else return false; #endif // linux @@ -1002,6 +980,9 @@ void SsdFile::readCheckpoint(std::ifstream& state) { idMap[id] = StringIdLease(fileIds(), id, name); } + const auto logPath = getEvictLogFilePath(); + auto evictLogFd_ = + ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); const auto logSize = ::lseek(evictLogFd_, 0, SEEK_END); std::vector evicted(logSize / sizeof(uint32_t)); const auto rc = ::pread(evictLogFd_, evicted.data(), logSize, 0); diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 434d41c342b9e..6b83c64415c3c 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); DECLARE_bool(ssd_verify_write); @@ -461,7 +462,7 @@ class SsdFile { // Logs an error message, deletes the checkpoint and stop making new // checkpoints. - void checkpointError(int32_t rc, const std::string& error); + void checkpointError(uint64_t rc, const std::string& error); // Looks for a checkpointed state and sets the state of 'this' by // the checkpointed state iif the state is complete and @@ -472,12 +473,11 @@ class SsdFile { // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write // succeeds; otherwise, log the error and return false. - bool - write(uint64_t offset, uint64_t length, const std::vector& iovecs); + bool write(int64_t offset, int64_t length, const std::vector& iovecs); // Synchronously logs that 'regions' are no longer valid in a possibly // existing checkpoint. - void logEviction(const std::vector& regions); + void logEviction(std::vector& regions); // Computes the checksum of data in cache 'entry'. uint32_t checksumEntry(const AsyncDataCacheEntry& entry) const; @@ -489,7 +489,7 @@ class SsdFile { // Returns true if checkpoint is needed. bool needCheckpoint(bool force) const { - if (!checkpointEnabled()) { + if (!checkpointEnabled() || checkpointWriteFile_ == nullptr) { return false; } return force || (bytesAfterCheckpoint_ >= checkpointIntervalBytes_); @@ -556,8 +556,8 @@ class SsdFile { // Map of file number and offset to location in file. folly::F14FastMap entries_; - // File descriptor. 0 (stdin) means file not open. - int32_t fd_{0}; + // File system. + std::shared_ptr fs_; // Size of the backing file in bytes. Must be multiple of kRegionSize. uint64_t fileSize_{0}; @@ -565,6 +565,15 @@ class SsdFile { // ReadFile made from 'fd_'. std::unique_ptr readFile_; + // WriteFile made from 'fd_'. + std::unique_ptr writeFile_; + + // WriteFile for logging evictions. + std::unique_ptr evictLogWriteFile_; + + // WriteFile for checkpoint. + std::unique_ptr checkpointWriteFile_; + // Counters. SsdCacheStats stats_; @@ -578,9 +587,6 @@ class SsdFile { // Count of bytes written after last checkpoint. std::atomic bytesAfterCheckpoint_{0}; - // fd for logging evictions. - int32_t evictLogFd_{-1}; - // True if there was an error with checkpoint and the checkpoint was deleted. bool checkpointDeleted_{false}; }; diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index fd2835596ad34..0b0bef1ca8425 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -17,6 +17,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test { static constexpr int64_t kMB = 1 << 20; void SetUp() override { + filesystems::registerLocalFileSystem(); memory::MemoryManager::testingSetInstance({}); }