From 76721ef172b8ef06eda85c8108352ac70d7ae2b0 Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Wed, 30 Oct 2024 10:56:36 -0700 Subject: [PATCH] Use Velox filesystem for SSD cache data file SSD cache currently uses native functions for file operations. This needs to be switched to Velox filesystem, so more advanced testing can be built by leveraging features like fault injections. --- velox/common/caching/SsdFile.cpp | 153 ++++++++++++--------- velox/common/caching/SsdFile.h | 17 ++- velox/common/caching/tests/SsdFileTest.cpp | 2 + 3 files changed, 100 insertions(+), 72 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 7f500973917a1..68859306e3383 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -16,7 +16,6 @@ #include "velox/common/caching/SsdFile.h" -#include #include #include "velox/common/base/AsyncSource.h" #include "velox/common/base/Crc.h" @@ -67,6 +66,28 @@ void disableCow(int32_t fd) { #endif // linux } +// TODO: Remove this function once we migrate all files to velox fs. +void disableFileCow(int32_t fd) { +#ifdef linux + int attr{0}; + auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", + res, + folly::errnoStr(errno)); + attr |= FS_NOCOW_FL; + res = ioctl(fd, FS_IOC_SETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", + res, + folly::errnoStr(errno)); +#endif // linux +} + void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -138,35 +159,21 @@ SsdFile::SsdFile(const Config& config) checksumReadVerificationEnabled_( config.checksumEnabled && config.checksumReadVerificationEnabled), shardId_(config.shardId), + fs_(filesystems::getFileSystem(fileName_, nullptr)), checkpointIntervalBytes_(config.checkpointIntervalBytes), executor_(config.executor) { process::TraceContext trace("SsdFile::SsdFile"); - int32_t oDirect = 0; -#ifdef linux - oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0; -#endif // linux - fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR); - if (fd_ < 0) { - ++stats_.openFileErrors; - } - // TODO: add fault tolerant handling for open file errors. - VELOX_CHECK_GE( - fd_, - 0, - "Cannot open or create {}. Error: {}", - fileName_, - folly::errnoStr(errno)); + filesystems::FileOptions fileOptions; + fileOptions.shouldThrowOnFileAlreadyExists = false; + fileOptions.bufferWrite = !FLAGS_ssd_odirect; + writeFile_ = fs_->openFileForWrite(fileName_, fileOptions); + readFile_ = fs_->openFileForRead(fileName_); - if (disableFileCow_) { - disableCow(fd_); - } - - readFile_ = std::make_unique(fd_); - const uint64_t size = lseek(fd_, 0, SEEK_END); + const uint64_t size = writeFile_->size(); numRegions_ = std::min(size / kRegionSize, maxRegions_); fileSize_ = numRegions_ * kRegionSize; if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) { - ::ftruncate(fd_, fileSize_); + writeFile_->truncate(fileSize_); } // The existing regions in the file are writable. writableRegions_.resize(numRegions_); @@ -178,6 +185,10 @@ SsdFile::SsdFile(const Config& config) if (checkpointEnabled()) { initializeCheckpoint(); } + + if (disableFileCow_) { + disableFileCow(); + } } void SsdFile::pinRegion(uint64_t offset) { @@ -324,19 +335,23 @@ bool SsdFile::growOrEvictLocked() { process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { const auto newSize = (numRegions_ + 1) * kRegionSize; - const auto rc = ::ftruncate(fd_, newSize); - if (rc >= 0) { + try { + writeFile_->truncate(newSize); fileSize_ = newSize; writableRegions_.push_back(numRegions_); regionSizes_[numRegions_] = 0; erasedRegionSizes_[numRegions_] = 0; ++numRegions_; + VELOX_SSD_CACHE_LOG(INFO) + << "Grow cache file " << fileName_ << " to " << numRegions_ + << " regions (max: " << maxRegions_ << ")"; return true; + } catch (const std::exception& e) { + ++stats_.growFileErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to grow cache file " << fileName_ << " to " << numRegions_ + << " regions. Error: " << e.what(); } - - ++stats_.growFileErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to grow cache file " << fileName_ << " to " << newSize; } const auto candidates = @@ -349,7 +364,7 @@ bool SsdFile::growOrEvictLocked() { logEviction(candidates); clearRegionEntriesLocked(candidates); stats_.regionsEvicted += candidates.size(); - writableRegions_ = std::move(candidates); + writableRegions_ = candidates; suspended_ = false; return true; } @@ -467,20 +482,21 @@ void SsdFile::write(std::vector& pins) { } bool SsdFile::write( - uint64_t offset, - uint64_t length, + int64_t offset, + int64_t length, const std::vector& iovecs) { - const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (ret == length) { + try { + writeFile_->write(iovecs, offset, length); return true; + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", size: " << iovecs.size() << ", offset: " << offset + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + return false; } - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ - << ", size: " << iovecs.size() << ", offset: " << offset - << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - return false; } namespace { @@ -497,8 +513,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) { void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); - const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset()); - VELOX_CHECK_EQ(rc, entry.size()); + const auto rc = + readFile_->pread(ssdRun.offset(), entry.size(), testData.get()); + VELOX_CHECK_EQ(rc.size(), entry.size()); if (entry.tinyData() != nullptr) { if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) { VELOX_FAIL("bad read back"); @@ -569,14 +586,16 @@ void SsdFile::clear() { void SsdFile::testingDeleteFile() { process::TraceContext trace("SsdFile::testingDeleteFile"); - if (fd_) { - close(fd_); - fd_ = 0; + if (writeFile_) { + writeFile_->close(); + writeFile_.reset(); } - auto rc = unlink(fileName_.c_str()); - if (rc < 0) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Error deleting cache file " << fileName_ << " rc: " << rc; + try { + fs_->remove(fileName_); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Failed to delete cache file " << fileName_ + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); } } @@ -744,11 +763,10 @@ void SsdFile::checkpoint(bool force) { // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. - auto fileSync = std::make_shared>( - [fd = fd_]() { return std::make_unique(::fsync(fd)); }); - if (executor_ != nullptr) { - executor_->add([fileSync]() { fileSync->prepare(); }); - } + auto fileSync = std::make_shared>([this]() { + writeFile_->flush(); + return std::make_unique(0); + }); std::ofstream state; const auto checkpointPath = getCheckpointFilePath(); @@ -803,8 +821,7 @@ void SsdFile::checkpoint(bool force) { // NOTE: we need to ensure cache file data sync update completes before // updating checkpoint file. - const auto fileSyncRc = fileSync->move(); - checkRc(*fileSyncRc, "Sync of cache data file"); + fileSync->move(); const auto endMarker = kCheckpointEndMarker; state.write(asChar(&endMarker), sizeof(endMarker)); @@ -943,18 +960,20 @@ void SsdFile::maybeVerifyChecksum( } } -bool SsdFile::testingIsCowDisabled() const { +void SsdFile::disableFileCow() { #ifdef linux - int attr{0}; - const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); + const std::unordered_map attributes = { + {std::string(LocalWriteFile::Attributes::kNoCow), "true"}}; + writeFile_->setAttributes(attributes); +#endif // linux +} - return (attr & FS_NOCOW_FL) == FS_NOCOW_FL; +bool SsdFile::testingIsCowDisabled() const { +#ifdef linux + const auto attributes = writeFile_->getAttributes(); + const auto it = + attributes.find(std::string(LocalWriteFile::Attributes::kNoCow)); + return it != attributes.end() && it->second == "true"; #else return false; #endif // linux diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 434d41c342b9e..31300a274a781 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); DECLARE_bool(ssd_verify_write); @@ -472,8 +473,7 @@ class SsdFile { // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write // succeeds; otherwise, log the error and return false. - bool - write(uint64_t offset, uint64_t length, const std::vector& iovecs); + bool write(int64_t offset, int64_t length, const std::vector& iovecs); // Synchronously logs that 'regions' are no longer valid in a possibly // existing checkpoint. @@ -499,6 +499,10 @@ class SsdFile { const AsyncDataCacheEntry& entry, const SsdRun& ssdRun); + // Disable 'copy on write'. Will throw if failed for any reason, including + // file system not supporting cow feature. + void disableFileCow(); + // Returns true if checksum write is enabled for the given version. static bool isChecksumEnabledOnCheckpointVersion( const std::string& checkpointVersion) { @@ -556,15 +560,18 @@ class SsdFile { // Map of file number and offset to location in file. folly::F14FastMap entries_; - // File descriptor. 0 (stdin) means file not open. - int32_t fd_{0}; + // File system. + std::shared_ptr fs_; // Size of the backing file in bytes. Must be multiple of kRegionSize. uint64_t fileSize_{0}; - // ReadFile made from 'fd_'. + // ReadFile for cache data file. std::unique_ptr readFile_; + // WriteFile for cache data file. + std::unique_ptr writeFile_; + // Counters. SsdCacheStats stats_; diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index fd2835596ad34..0b0bef1ca8425 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -17,6 +17,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test { static constexpr int64_t kMB = 1 << 20; void SetUp() override { + filesystems::registerLocalFileSystem(); memory::MemoryManager::testingSetInstance({}); }