diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index bbf8e72c805f..68859306e338 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -43,9 +43,6 @@ DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD"); namespace facebook::velox::cache { namespace { - -// TODO: Remove this function once we migrate all files to velox fs. -// // Disable 'copy on write' on the given file. Will throw if failed for any // reason, including file system not supporting cow feature. void disableCow(int32_t fd) { @@ -69,6 +66,28 @@ void disableCow(int32_t fd) { #endif // linux } +// TODO: Remove this function once we migrate all files to velox fs. +void disableFileCow(int32_t fd) { +#ifdef linux + int attr{0}; + auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", + res, + folly::errnoStr(errno)); + attr |= FS_NOCOW_FL; + res = ioctl(fd, FS_IOC_SETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", + res, + folly::errnoStr(errno)); +#endif // linux +} + void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -335,7 +354,7 @@ bool SsdFile::growOrEvictLocked() { } } - auto candidates = + const auto candidates = tracker_.findEvictionCandidates(3, numRegions_, regionPins_); if (candidates.empty()) { suspended_ = true; @@ -657,17 +676,13 @@ bool SsdFile::removeFileEntries( return true; } -void SsdFile::logEviction(std::vector& regions) { - if (!checkpointEnabled()) { - return; - } - const auto length = regions.size() * sizeof(regions[0]); - const std::vector iovecs = {{regions.data(), length}}; - try { - evictLogWriteFile_->write(iovecs, 0, static_cast(length)); - } catch (const std::exception& e) { - ++stats_.writeSsdErrors; - VELOX_SSD_CACHE_LOG(ERROR) << "Failed to log eviction: " << e.what(); +void SsdFile::logEviction(const std::vector& regions) { + if (checkpointEnabled()) { + const int32_t rc = ::write( + evictLogFd_, regions.data(), regions.size() * sizeof(regions[0])); + if (rc != regions.size() * sizeof(regions[0])) { + checkpointError(rc, "Failed to log eviction"); + } } } @@ -675,31 +690,30 @@ void SsdFile::deleteCheckpoint(bool keepLog) { if (checkpointDeleted_) { return; } - - if (evictLogWriteFile_ != nullptr) { - try { - if (keepLog) { - evictLogWriteFile_->truncate(0); - evictLogWriteFile_->flush(); - } else { - evictLogWriteFile_->close(); - fs_->remove(getEvictLogFilePath()); - evictLogWriteFile_.reset(); - } - } catch (const std::exception& e) { - ++stats_.deleteCheckpointErrors; - VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what(); + if (evictLogFd_ >= 0) { + if (keepLog) { + ::lseek(evictLogFd_, 0, SEEK_SET); + ::ftruncate(evictLogFd_, 0); + ::fsync(evictLogFd_); + } else { + ::close(evictLogFd_); + evictLogFd_ = -1; } } + checkpointDeleted_ = true; + const auto logPath = getEvictLogFilePath(); + int32_t logRc = 0; + if (!keepLog) { + logRc = ::unlink(logPath.c_str()); + } const auto checkpointPath = getCheckpointFilePath(); const auto checkpointRc = ::unlink(checkpointPath.c_str()); - if (checkpointRc != 0) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Error in deleting checkpoint: " << checkpointRc; - } - if (checkpointRc != 0) { + if ((logRc != 0) || (checkpointRc != 0)) { ++stats_.deleteCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Error in deleting log and checkpoint. log: " << logRc + << " checkpoint: " << checkpointRc; } } @@ -837,9 +851,8 @@ void SsdFile::checkpoint(bool force) { // NOTE: we shall truncate eviction log after checkpoint file sync // completes so that we never recover from an old checkpoint file without // log evictions. The latter might lead to data consistent issue. - VELOX_CHECK_NOT_NULL(evictLogWriteFile_); - evictLogWriteFile_->truncate(0); - evictLogWriteFile_->flush(); + checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log"); + checkRc(::fsync(evictLogFd_), "Sync of evict log"); VELOX_SSD_CACHE_LOG(INFO) << "Checkpoint persisted with " << entries_.size() << " cache entries"; @@ -870,14 +883,18 @@ void SsdFile::initializeCheckpoint() { getCheckpointFilePath()); } const auto logPath = getEvictLogFilePath(); - filesystems::FileOptions evictLogFileOptions; - evictLogFileOptions.shouldThrowOnFileAlreadyExists = false; - try { - evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions); - } catch (std::exception& e) { + evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); + if (disableFileCow_) { + disableCow(evictLogFd_); + } + if (evictLogFd_ < 0) { ++stats_.openLogErrors; // Failure to open the log at startup is a process terminating error. - VELOX_FAIL("Could not open evict log {}: {}", logPath, e.what()); + VELOX_FAIL( + "Could not open evict log {}, rc {}: {}", + logPath, + evictLogFd_, + folly::errnoStr(errno)); } try { @@ -948,9 +965,6 @@ void SsdFile::disableFileCow() { const std::unordered_map attributes = { {std::string(LocalWriteFile::Attributes::kNoCow), "true"}}; writeFile_->setAttributes(attributes); - if (evictLogWriteFile_ != nullptr) { - evictLogWriteFile_->setAttributes(attributes); - } #endif // linux } @@ -1007,16 +1021,10 @@ void SsdFile::readCheckpoint(std::ifstream& state) { idMap[id] = StringIdLease(fileIds(), id, name); } - const auto logPath = getEvictLogFilePath(); - const auto evictLogReadFile = fs_->openFileForRead(logPath); - const auto logSize = evictLogReadFile->size(); + const auto logSize = ::lseek(evictLogFd_, 0, SEEK_END); std::vector evicted(logSize / sizeof(uint32_t)); - try { - evictLogReadFile->pread(0, logSize, evicted.data()); - } catch (const std::exception& e) { - ++stats_.readCheckpointErrors; - VELOX_FAIL("Failed to read eviction log: {}", e.what()); - } + const auto rc = ::pread(evictLogFd_, evicted.data(), logSize, 0); + VELOX_CHECK_EQ(logSize, rc, "Failed to read eviction log"); std::unordered_set evictedMap; for (auto region : evicted) { evictedMap.insert(region); diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index e166fda1542b..31300a274a78 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -367,13 +367,7 @@ class SsdFile { /// Returns the checkpoint file path. std::string getCheckpointFilePath() const { - // Faulty file path needs to be handled manually before we switch checkpoint - // file to Velox filesystem. - const std::string faultyPrefix = "faulty:"; - std::string checkpointPath = fileName_ + kCheckpointExtension; - return checkpointPath.find(faultyPrefix) == 0 - ? checkpointPath.substr(faultyPrefix.size()) - : checkpointPath; + return fileName_ + kCheckpointExtension; } /// Deletes the backing file. Used in testing. @@ -483,7 +477,7 @@ class SsdFile { // Synchronously logs that 'regions' are no longer valid in a possibly // existing checkpoint. - void logEviction(std::vector& regions); + void logEviction(const std::vector& regions); // Computes the checksum of data in cache 'entry'. uint32_t checksumEntry(const AsyncDataCacheEntry& entry) const; @@ -578,9 +572,6 @@ class SsdFile { // WriteFile for cache data file. std::unique_ptr writeFile_; - // WriteFile for evict log file. - std::unique_ptr evictLogWriteFile_; - // Counters. SsdCacheStats stats_; @@ -594,6 +585,9 @@ class SsdFile { // Count of bytes written after last checkpoint. std::atomic bytesAfterCheckpoint_{0}; + // fd for logging evictions. + int32_t evictLogFd_{-1}; + // True if there was an error with checkpoint and the checkpoint was deleted. bool checkpointDeleted_{false}; }; diff --git a/velox/common/caching/tests/CMakeLists.txt b/velox/common/caching/tests/CMakeLists.txt index 0bec2e101fd5..cc7fafe2a752 100644 --- a/velox/common/caching/tests/CMakeLists.txt +++ b/velox/common/caching/tests/CMakeLists.txt @@ -36,7 +36,6 @@ target_link_libraries( PRIVATE velox_caching velox_file - velox_file_test_utils velox_memory velox_temp_path Folly::folly diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 771420243f6c..0b0bef1ca842 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -18,7 +18,6 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" -#include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/memory/Memory.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -26,11 +25,9 @@ #include #include #include -#include using namespace facebook::velox; using namespace facebook::velox::cache; -using namespace facebook::velox::tests::utils; using facebook::velox::memory::MemoryAllocator; @@ -50,7 +47,6 @@ class SsdFileTest : public testing::Test { void SetUp() override { filesystems::registerLocalFileSystem(); - registerFaultyFileSystem(); memory::MemoryManager::testingSetInstance({}); } @@ -69,14 +65,12 @@ class SsdFileTest : public testing::Test { uint64_t checkpointIntervalBytes = 0, bool checksumEnabled = false, bool checksumReadVerificationEnabled = false, - bool disableFileCow = false, - bool enableFaultInjection = false) { + bool disableFileCow = false) { // tmpfs does not support O_DIRECT, so turn this off for testing. FLAGS_ssd_odirect = false; cache_ = AsyncDataCache::create(memory::memoryManager()->allocator()); fileName_ = StringIdLease(fileIds(), "fileInStorage"); - tempDirectory_ = - exec::test::TempDirectoryPath::create(enableFaultInjection); + tempDirectory_ = exec::test::TempDirectoryPath::create(); initializeSsdFile( ssdBytes, checkpointIntervalBytes, @@ -801,140 +795,6 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) { #endif } -TEST_F(SsdFileTest, dataFileErrorInjection) { - constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize; - initializeCache(kSsdSize, 0, false, false, false, true); - - auto faultyFs = faultyFileSystem(); - std::atomic_bool injectWriteError{true}; - std::atomic_bool injectReadError{true}; - faultyFs->setFileInjectionHook([&](FaultFileOperation* op) { - if (injectWriteError && op->type == FaultFileOperation::Type::kWrite) { - VELOX_FAIL("Inject hook write failure"); - } - if (injectReadError && op->type == FaultFileOperation::Type::kReadv) { - VELOX_FAIL("Inject hook read failure"); - } - }); - - // Write a set of cache entries. - auto pins = - makePins(fileName_.id(), 0, 4096, 2048 * 1025, SsdFile::kRegionSize / 2); - ssdFile_->write(pins); - - // With write error injected, no entry has been written to SSD cache and the - // error has recorded as a write ssd error. - SsdCacheStats statsWithWriteErrorInjected; - ssdFile_->updateStats(statsWithWriteErrorInjected); - - EXPECT_GT(statsWithWriteErrorInjected.writeSsdErrors, 0); - EXPECT_EQ(statsWithWriteErrorInjected.entriesWritten, 0); - - // Without write error injected, the data was cached to SSD successfully. - injectWriteError = false; - ssdFile_->write(pins); - - SsdCacheStats statsWithoutWriteErrorInjected; - ssdFile_->updateStats(statsWithoutWriteErrorInjected); - - EXPECT_EQ( - statsWithoutWriteErrorInjected.writeSsdErrors, - statsWithWriteErrorInjected.writeSsdErrors); // No new error occurred. - EXPECT_GT(statsWithoutWriteErrorInjected.entriesWritten, 0); - EXPECT_GT( - statsWithoutWriteErrorInjected.regionsCached, - statsWithWriteErrorInjected.regionsCached); - - // Load the ssd pins by reading the ssd cache. - std::vector ssdPins; - ssdPins.reserve(pins.size()); - for (auto& pin : pins) { - ssdPins.push_back(ssdFile_->find(RawFileCacheKey{ - pin.entry()->key().fileNum.id(), pin.entry()->key().offset})); - } - - SsdCacheStats statsWithReadErrorInjected; - ssdFile_->updateStats(statsWithReadErrorInjected); - VELOX_ASSERT_THROW(ssdFile_->load(ssdPins, pins), "Inject hook read failure"); - VELOX_ASSERT_THROW( - readAndCheckPins(pins), ""); // Cache pins have not been loaded. - EXPECT_EQ(statsWithReadErrorInjected.entriesRead, 0); // No entry was loaded. - - injectReadError = false; - ssdFile_->load(ssdPins, pins); - readAndCheckPins(pins); - SsdCacheStats statsWithoutReadErrorInjected; - ssdFile_->updateStats(statsWithoutReadErrorInjected); - EXPECT_GT( - statsWithoutReadErrorInjected.entriesRead, - 0); // Read operations succeeded after clearing the injected error. -} - -TEST_F(SsdFileTest, evictlogFileErrorInjection) { - constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize; - const uint64_t checkpointIntervalBytes = 5 * SsdFile::kRegionSize; - const auto retainFile = StringIdLease(fileIds(), "faultyFiles.Retained"); - const auto evictFile = StringIdLease(fileIds(), "faultyFiles.Evicted"); - - initializeCache(kSsdSize, checkpointIntervalBytes, false, false, false, true); - - auto faultyFs = faultyFileSystem(); - faultyFs->setFileInjectionHook([&](FaultFileOperation* op) { - // Inject error on evict log file only. - const std::string evictlogFileRe(".*log"); - if (RE2::FullMatch(op->path, evictlogFileRe)) { - VELOX_FAIL("Inject hook read failure"); - } - }); - - // Fully populate ssd cache with two files. - for (auto startOffset = 0; startOffset <= kSsdSize / 2 - SsdFile::kRegionSize; - startOffset += SsdFile::kRegionSize) { - auto pins = makePins( - retainFile.id(), - startOffset, - 4096, - 2048 * 1025, - SsdFile::kRegionSize / 2); - ssdFile_->write(pins); - } - - for (auto startOffset = kSsdSize / 2; - startOffset <= kSsdSize - SsdFile::kRegionSize; - startOffset += SsdFile::kRegionSize) { - auto pins = makePins( - evictFile.id(), - startOffset + SsdFile::kRegionSize, - 4096, - 2048 * 1025, - SsdFile::kRegionSize / 2); - ssdFile_->write(pins); - } - - SsdCacheStats statsBeforeEviction; - ssdFile_->updateStats(statsBeforeEviction); - ASSERT_GT(statsBeforeEviction.entriesWritten, 0); - - // Remove one file from the ssd cache to trigger eviction. - folly::F14FastSet retainedFileIds; - ssdFile_->removeFileEntries({evictFile.id()}, retainedFileIds); - ASSERT_TRUE(retainedFileIds.empty()); - - SsdCacheStats statsWithLogErrorInjected; - ssdFile_->updateStats(statsWithLogErrorInjected); - EXPECT_GT( - statsWithLogErrorInjected.writeSsdErrors, - statsBeforeEviction.writeSsdErrors); - - // Re-initialize SSD file from checkpoint. - ssdFile_->checkpoint(true); - initializeSsdFile(kSsdSize, checkpointIntervalBytes); - - SsdCacheStats statsAfterRecovery; - ssdFile_->updateStats(statsAfterRecovery); - ASSERT_GT(statsAfterRecovery.readCheckpointErrors, 0); -} - #ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG TEST_F(SsdFileTest, disabledCow) { constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize; diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index 7f21f5c30a3c..2593fa1fad7b 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -99,7 +99,7 @@ FaultyWriteFile::FaultyWriteFile( void FaultyWriteFile::append(std::string_view data) { if (injectionHook_ != nullptr) { - FaultFileAppendOperation op(path_, data); + FaultFileWriteOperation op(path_, data); injectionHook_(&op); if (!op.delegate) { return; @@ -116,13 +116,6 @@ void FaultyWriteFile::write( const std::vector& iovecs, int64_t offset, int64_t length) { - if (injectionHook_ != nullptr) { - FaultFileWriteOperation op(path_, iovecs, offset, length); - injectionHook_(&op); - if (!op.delegate) { - return; - } - } delegatedFile_->write(iovecs, offset, length); } diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index 4b09850d1ee1..968d98da3cf9 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -26,7 +26,6 @@ struct FaultFileOperation { /// Injects faults for file read operations. kRead, kReadv, - kAppend, kWrite, /// TODO: add to support fault injections for the other file operation /// types. @@ -86,32 +85,15 @@ struct FaultFileReadvOperation : FaultFileOperation { buffers(_buffers) {} }; -/// Fault injection parameters for file append API. -struct FaultFileAppendOperation : FaultFileOperation { - std::string_view* data; - - FaultFileAppendOperation( - const std::string& _path, - const std::string_view& _data) - : FaultFileOperation(FaultFileOperation::Type::kAppend, _path), - data(const_cast(&_data)) {} -}; - /// Fault injection parameters for file write API. struct FaultFileWriteOperation : FaultFileOperation { - const std::vector& iovecs; - int64_t offset; - int64_t length; + std::string_view* data; FaultFileWriteOperation( const std::string& _path, - const std::vector& _iovecs, - int64_t _offset, - int64_t _length) + const std::string_view& _data) : FaultFileOperation(FaultFileOperation::Type::kWrite, _path), - iovecs(_iovecs), - offset(_offset), - length(_length) {} + data(const_cast(&_data)) {} }; /// The fault injection hook on the file operation path. diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 5706a3a52717..4534309adc6a 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -654,7 +654,7 @@ TEST_F(FaultyFsTest, fileReadFaultHookInjection) { TEST_F(FaultyFsTest, fileWriteErrorInjection) { // Set write error. - fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kAppend}); + fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kWrite}); { auto writeFile = fs_->openFileForWrite(writeFilePath_, {}); VELOX_ASSERT_THROW(writeFile->append("hello"), "InjectedFaultFileError"); @@ -672,7 +672,7 @@ TEST_F(FaultyFsTest, fileWriteErrorInjection) { TEST_F(FaultyFsTest, fileWriteDelayInjection) { // Set 2 seconds delay. const uint64_t injectDelay{2'000'000}; - fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kAppend}); + fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kWrite}); { auto writeFile = fs_->openFileForWrite(writeFilePath_, {}); uint64_t readDurationUs{0}; @@ -691,14 +691,14 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { // Set to write fake data. fs_->setFileInjectionHook([&](FaultFileOperation* op) { // Only inject for write. - if (op->type != FaultFileOperation::Type::kAppend) { + if (op->type != FaultFileOperation::Type::kWrite) { return; } // Only inject for path2. if (op->path != path2) { return; } - auto* writeOp = static_cast(op); + auto* writeOp = static_cast(op); *writeOp->data = "Error data"; }); { @@ -725,14 +725,14 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { // Set to not delegate. fs_->setFileInjectionHook([&](FaultFileOperation* op) { // Only inject for write. - if (op->type != FaultFileOperation::Type::kAppend) { + if (op->type != FaultFileOperation::Type::kWrite) { return; } // Only inject for path2. if (op->path != path2) { return; } - auto* writeOp = static_cast(op); + auto* writeOp = static_cast(op); writeOp->delegate = false; }); {