diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 7f500973917a1..ab07adb5c9900 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -16,7 +16,6 @@ #include "velox/common/caching/SsdFile.h" -#include #include #include "velox/common/base/AsyncSource.h" #include "velox/common/base/Crc.h" @@ -29,9 +28,6 @@ #ifdef linux #include #endif // linux -#include -#include -#include #include #include @@ -44,29 +40,6 @@ DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD"); namespace facebook::velox::cache { namespace { -// Disable 'copy on write' on the given file. Will throw if failed for any -// reason, including file system not supporting cow feature. -void disableCow(int32_t fd) { -#ifdef linux - int attr{0}; - auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - attr |= FS_NOCOW_FL; - res = ioctl(fd, FS_IOC_SETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", - res, - folly::errnoStr(errno)); -#endif // linux -} - void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -145,28 +118,18 @@ SsdFile::SsdFile(const Config& config) #ifdef linux oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0; #endif // linux - fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR); - if (fd_ < 0) { - ++stats_.openFileErrors; - } - // TODO: add fault tolerant handling for open file errors. - VELOX_CHECK_GE( - fd_, - 0, - "Cannot open or create {}. Error: {}", - fileName_, - folly::errnoStr(errno)); - - if (disableFileCow_) { - disableCow(fd_); - } - - readFile_ = std::make_unique(fd_); - const uint64_t size = lseek(fd_, 0, SEEK_END); + fs_ = filesystems::getFileSystem(fileName_, nullptr); + filesystems::FileOptions fileOptions; + fileOptions.shouldThrowOnFileAlreadyExists = false; + fileOptions.bufferWrite = false; + writeFile_ = fs_->openFileForWrite(fileName_, fileOptions); + readFile_ = fs_->openFileForRead(fileName_); + + const uint64_t size = writeFile_->size(); numRegions_ = std::min(size / kRegionSize, maxRegions_); fileSize_ = numRegions_ * kRegionSize; if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) { - ::ftruncate(fd_, fileSize_); + writeFile_->truncate(fileSize_); } // The existing regions in the file are writable. writableRegions_.resize(numRegions_); @@ -178,6 +141,22 @@ SsdFile::SsdFile(const Config& config) if (checkpointEnabled()) { initializeCheckpoint(); } + + if (disableFileCow_) { +// Disable 'copy on write' on the given file. Will throw if failed for any +// reason, including file system not supporting cow feature. +#ifdef linux + const std::unordered_map attributes = { + {std::string(LocalWriteFile::Attributes::kNoCow), "true"}}; + writeFile_->setAttributes(attributes); + if (evictLogWriteFile_ != nullptr) { + evictLogWriteFile_->setAttributes(attributes); + } + if (checkpointWriteFile_ != nullptr) { + checkpointWriteFile_->setAttributes(attributes); + } +#endif // linux + } } void SsdFile::pinRegion(uint64_t offset) { @@ -324,23 +303,30 @@ bool SsdFile::growOrEvictLocked() { process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { const auto newSize = (numRegions_ + 1) * kRegionSize; - const auto rc = ::ftruncate(fd_, newSize); - if (rc >= 0) { + try { + writeFile_->truncate(newSize); fileSize_ = newSize; writableRegions_.push_back(numRegions_); regionSizes_[numRegions_] = 0; erasedRegionSizes_[numRegions_] = 0; ++numRegions_; + VELOX_SSD_CACHE_LOG(WARNING) + << "Grow " << fileName_ << " to " << numRegions_ + << " regions (max: " << maxRegions_ << ")"; return true; + } catch (const std::exception& e) { + ++stats_.growFileErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to grow cache file " << fileName_ << " to " << newSize + << " with error: " << e.what(); } - - ++stats_.growFileErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to grow cache file " << fileName_ << " to " << newSize; } - const auto candidates = + auto candidates = tracker_.findEvictionCandidates(3, numRegions_, regionPins_); + VELOX_SSD_CACHE_LOG(WARNING) + << "Evicting " << candidates.size() << " regions from " << fileName_ + << " (current: " << numRegions_ << ")"; if (candidates.empty()) { suspended_ = true; return false; @@ -467,20 +453,21 @@ void SsdFile::write(std::vector& pins) { } bool SsdFile::write( - uint64_t offset, - uint64_t length, + int64_t offset, + int64_t length, const std::vector& iovecs) { - const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (ret == length) { - return true; + try { + writeFile_->write(iovecs, offset, length); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", size: " << iovecs.size() << ", offset: " << offset + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + return false; } - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ - << ", size: " << iovecs.size() << ", offset: " << offset - << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - return false; + return true; } namespace { @@ -497,8 +484,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) { void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); - const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset()); - VELOX_CHECK_EQ(rc, entry.size()); + const auto rc = + readFile_->pread(ssdRun.offset(), entry.size(), testData.get()); + VELOX_CHECK_EQ(rc.size(), entry.size()); if (entry.tinyData() != nullptr) { if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) { VELOX_FAIL("bad read back"); @@ -569,9 +557,9 @@ void SsdFile::clear() { void SsdFile::testingDeleteFile() { process::TraceContext trace("SsdFile::testingDeleteFile"); - if (fd_) { - close(fd_); - fd_ = 0; + if (writeFile_) { + writeFile_->close(); + writeFile_ = nullptr; } auto rc = unlink(fileName_.c_str()); if (rc < 0) { @@ -657,12 +645,14 @@ bool SsdFile::removeFileEntries( return true; } -void SsdFile::logEviction(const std::vector& regions) { +void SsdFile::logEviction(std::vector& regions) { if (checkpointEnabled()) { - const int32_t rc = ::write( - evictLogFd_, regions.data(), regions.size() * sizeof(regions[0])); - if (rc != regions.size() * sizeof(regions[0])) { - checkpointError(rc, "Failed to log eviction"); + const auto length = regions.size() * sizeof(regions[0]); + std::vector iovecs = {{regions.data(), length}}; + try { + evictLogWriteFile_->write(iovecs, 0, length); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Failed to log eviction: " << e.what(); } } } @@ -671,34 +661,34 @@ void SsdFile::deleteCheckpoint(bool keepLog) { if (checkpointDeleted_) { return; } - if (evictLogFd_ >= 0) { - if (keepLog) { - ::lseek(evictLogFd_, 0, SEEK_SET); - ::ftruncate(evictLogFd_, 0); - ::fsync(evictLogFd_); - } else { - ::close(evictLogFd_); - evictLogFd_ = -1; + + if (evictLogWriteFile_ != nullptr) { + try { + if (keepLog) { + evictLogWriteFile_->truncate(0); + evictLogWriteFile_->flush(); + } else { + evictLogWriteFile_->close(); + fs_->remove(getEvictLogFilePath()); + evictLogWriteFile_ = nullptr; + } + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what(); } } checkpointDeleted_ = true; const auto logPath = getEvictLogFilePath(); - int32_t logRc = 0; - if (!keepLog) { - logRc = ::unlink(logPath.c_str()); - } const auto checkpointPath = getCheckpointFilePath(); - const auto checkpointRc = ::unlink(checkpointPath.c_str()); - if ((logRc != 0) || (checkpointRc != 0)) { + try { + fs_->remove(getCheckpointFilePath()); + } catch (const std::exception& e) { ++stats_.deleteCheckpointErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Error in deleting log and checkpoint. log: " << logRc - << " checkpoint: " << checkpointRc; + VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what(); } } -void SsdFile::checkpointError(int32_t rc, const std::string& error) { +void SsdFile::checkpointError(uint64_t rc, const std::string& error) { VELOX_SSD_CACHE_LOG(ERROR) << error << " with rc=" << rc << ", deleting checkpoint and continuing with checkpointing off"; @@ -734,21 +724,13 @@ void SsdFile::checkpoint(bool force) { checkpointDeleted_ = false; bytesAfterCheckpoint_ = 0; try { - const auto checkRc = [&](int32_t rc, const std::string& errMsg) { - if (rc < 0) { - VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno)); - } - return rc; - }; - // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. - auto fileSync = std::make_shared>( - [fd = fd_]() { return std::make_unique(::fsync(fd)); }); - if (executor_ != nullptr) { - executor_->add([fileSync]() { fileSync->prepare(); }); - } + auto fileSync = std::make_shared>([this]() { + writeFile_->flush(); + return std::make_unique(0); + }); std::ofstream state; const auto checkpointPath = getCheckpointFilePath(); @@ -803,39 +785,28 @@ void SsdFile::checkpoint(bool force) { // NOTE: we need to ensure cache file data sync update completes before // updating checkpoint file. - const auto fileSyncRc = fileSync->move(); - checkRc(*fileSyncRc, "Sync of cache data file"); + fileSync->move(); const auto endMarker = kCheckpointEndMarker; state.write(asChar(&endMarker), sizeof(endMarker)); if (state.bad()) { ++stats_.writeCheckpointErrors; - checkRc(-1, "Write of checkpoint file"); + VELOX_FAIL("Write of checkpoint file:{}", folly::errnoStr(errno)); } else { ++stats_.checkpointsWritten; } state.close(); - // Sync checkpoint data file. ofstream does not have a sync method, so open - // as fd and sync that. - const auto checkpointFd = checkRc( - ::open(checkpointPath.c_str(), O_WRONLY), - "Open of checkpoint file for sync"); - // TODO: add this as file open option after we migrate to use velox - // filesystem for ssd file access. - if (disableFileCow_) { - disableCow(checkpointFd); - } - VELOX_CHECK_GE(checkpointFd, 0); - checkRc(::fsync(checkpointFd), "Sync of checkpoint file"); - ::close(checkpointFd); + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + checkpointWriteFile_->flush(); // NOTE: we shall truncate eviction log after checkpoint file sync // completes so that we never recover from an old checkpoint file without // log evictions. The latter might lead to data consistent issue. - checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log"); - checkRc(::fsync(evictLogFd_), "Sync of evict log"); + VELOX_CHECK_NOT_NULL(evictLogWriteFile_); + evictLogWriteFile_->truncate(0); + evictLogWriteFile_->flush(); VELOX_SSD_CACHE_LOG(INFO) << "Checkpoint persisted with " << entries_.size() << " cache entries"; @@ -866,18 +837,29 @@ void SsdFile::initializeCheckpoint() { getCheckpointFilePath()); } const auto logPath = getEvictLogFilePath(); - evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); - if (disableFileCow_) { - disableCow(evictLogFd_); - } - if (evictLogFd_ < 0) { + filesystems::FileOptions evictLogFileOptions; + evictLogFileOptions.shouldThrowOnFileAlreadyExists = false; + try { + evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions); + } catch (std::exception& e) { ++stats_.openLogErrors; // Failure to open the log at startup is a process terminating error. + VELOX_FAIL("Could not open evict log {}: {}", logPath, e.what()); + } + + const auto checkpointPath = getCheckpointFilePath(); + filesystems::FileOptions checkpointFileOptions; + checkpointFileOptions.shouldThrowOnFileAlreadyExists = false; + try { + checkpointWriteFile_ = + fs_->openFileForWrite(checkpointPath, checkpointFileOptions); + } catch (std::exception& e) { + ++stats_.openCheckpointErrors; + // Failure to open the log at startup is a process terminating error. VELOX_FAIL( - "Could not open evict log {}, rc {}: {}", - logPath, - evictLogFd_, - folly::errnoStr(errno)); + "Could not open checkpoint file for write {}: {}", + checkpointPath, + e.what()); } try { @@ -936,8 +918,11 @@ void SsdFile::maybeVerifyChecksum( if (checksum != ssdRun.checksum()) { ++stats_.readSsdCorruptions; VELOX_FAIL( - "IOERR: Corrupt SSD cache entry - File: {}, Offset: {}, Size: {}", + "IOERR: Corrupt SSD cache entry calc {} vs ssdRun {} - File: {}, Entry: {}, Offset: {}, Size: {}", + checksum, + ssdRun.checksum(), fileName_, + entry.toString(), ssdRun.offset(), ssdRun.size()); } @@ -945,16 +930,10 @@ void SsdFile::maybeVerifyChecksum( bool SsdFile::testingIsCowDisabled() const { #ifdef linux - int attr{0}; - const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - - return (attr & FS_NOCOW_FL) == FS_NOCOW_FL; + const auto attributes = writeFile_->getAttributes(); + const auto it = + attributes.find(std::string(LocalWriteFile::Attributes::kNoCow)); + return it != attributes.end() && it->second == "true"; #else return false; #endif // linux @@ -1002,6 +981,9 @@ void SsdFile::readCheckpoint(std::ifstream& state) { idMap[id] = StringIdLease(fileIds(), id, name); } + const auto logPath = getEvictLogFilePath(); + auto evictLogFd_ = + ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); const auto logSize = ::lseek(evictLogFd_, 0, SEEK_END); std::vector evicted(logSize / sizeof(uint32_t)); const auto rc = ::pread(evictLogFd_, evicted.data(), logSize, 0); diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 434d41c342b9e..6b83c64415c3c 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); DECLARE_bool(ssd_verify_write); @@ -461,7 +462,7 @@ class SsdFile { // Logs an error message, deletes the checkpoint and stop making new // checkpoints. - void checkpointError(int32_t rc, const std::string& error); + void checkpointError(uint64_t rc, const std::string& error); // Looks for a checkpointed state and sets the state of 'this' by // the checkpointed state iif the state is complete and @@ -472,12 +473,11 @@ class SsdFile { // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write // succeeds; otherwise, log the error and return false. - bool - write(uint64_t offset, uint64_t length, const std::vector& iovecs); + bool write(int64_t offset, int64_t length, const std::vector& iovecs); // Synchronously logs that 'regions' are no longer valid in a possibly // existing checkpoint. - void logEviction(const std::vector& regions); + void logEviction(std::vector& regions); // Computes the checksum of data in cache 'entry'. uint32_t checksumEntry(const AsyncDataCacheEntry& entry) const; @@ -489,7 +489,7 @@ class SsdFile { // Returns true if checkpoint is needed. bool needCheckpoint(bool force) const { - if (!checkpointEnabled()) { + if (!checkpointEnabled() || checkpointWriteFile_ == nullptr) { return false; } return force || (bytesAfterCheckpoint_ >= checkpointIntervalBytes_); @@ -556,8 +556,8 @@ class SsdFile { // Map of file number and offset to location in file. folly::F14FastMap entries_; - // File descriptor. 0 (stdin) means file not open. - int32_t fd_{0}; + // File system. + std::shared_ptr fs_; // Size of the backing file in bytes. Must be multiple of kRegionSize. uint64_t fileSize_{0}; @@ -565,6 +565,15 @@ class SsdFile { // ReadFile made from 'fd_'. std::unique_ptr readFile_; + // WriteFile made from 'fd_'. + std::unique_ptr writeFile_; + + // WriteFile for logging evictions. + std::unique_ptr evictLogWriteFile_; + + // WriteFile for checkpoint. + std::unique_ptr checkpointWriteFile_; + // Counters. SsdCacheStats stats_; @@ -578,9 +587,6 @@ class SsdFile { // Count of bytes written after last checkpoint. std::atomic bytesAfterCheckpoint_{0}; - // fd for logging evictions. - int32_t evictLogFd_{-1}; - // True if there was an error with checkpoint and the checkpoint was deleted. bool checkpointDeleted_{false}; }; diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index fd2835596ad34..0b0bef1ca8425 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -17,6 +17,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test { static constexpr int64_t kMB = 1 << 20; void SetUp() override { + filesystems::registerLocalFileSystem(); memory::MemoryManager::testingSetInstance({}); }