Skip to content

Commit

Permalink
Use Velox filesystem for SSD cache data file
Browse files Browse the repository at this point in the history
SSD cache currently uses native functions for file operations. This
needs to be switched to Velox filesystem, so more advanced testing can
be built by leveraging features like fault injections.
  • Loading branch information
zacw7 committed Oct 26, 2024
1 parent 817fa90 commit a688ce0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 71 deletions.
146 changes: 81 additions & 65 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/common/caching/SsdFile.h"

#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/Crc.h"
Expand Down Expand Up @@ -67,6 +66,27 @@ void disableCow(int32_t fd) {
#endif // linux
}

void disableFileCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -141,32 +161,18 @@ SsdFile::SsdFile(const Config& config)
checkpointIntervalBytes_(config.checkpointIntervalBytes),
executor_(config.executor) {
process::TraceContext trace("SsdFile::SsdFile");
int32_t oDirect = 0;
#ifdef linux
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
#endif // linux
fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR);
if (fd_ < 0) {
++stats_.openFileErrors;
}
// TODO: add fault tolerant handling for open file errors.
VELOX_CHECK_GE(
fd_,
0,
"Cannot open or create {}. Error: {}",
fileName_,
folly::errnoStr(errno));

if (disableFileCow_) {
disableCow(fd_);
}

readFile_ = std::make_unique<LocalReadFile>(fd_);
const uint64_t size = lseek(fd_, 0, SEEK_END);
fs_ = filesystems::getFileSystem(fileName_, nullptr);
filesystems::FileOptions fileOptions;
fileOptions.shouldThrowOnFileAlreadyExists = false;
fileOptions.bufferWrite = false;
writeDataFile_ = fs_->openFileForWrite(fileName_, fileOptions);
readDataFile_ = fs_->openFileForRead(fileName_);

const uint64_t size = writeDataFile_->size();
numRegions_ = std::min<int32_t>(size / kRegionSize, maxRegions_);
fileSize_ = numRegions_ * kRegionSize;
if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) {
::ftruncate(fd_, fileSize_);
writeDataFile_->truncate(fileSize_);
}
// The existing regions in the file are writable.
writableRegions_.resize(numRegions_);
Expand All @@ -178,6 +184,10 @@ SsdFile::SsdFile(const Config& config)
if (checkpointEnabled()) {
initializeCheckpoint();
}

if (disableFileCow_) {
disableFileCow();
}
}

void SsdFile::pinRegion(uint64_t offset) {
Expand Down Expand Up @@ -280,7 +290,7 @@ void SsdFile::read(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) {
process::TraceContext trace("SsdFile::read");
readFile_->preadv(offset, buffers);
readDataFile_->preadv(offset, buffers);
}

std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
Expand Down Expand Up @@ -324,19 +334,23 @@ bool SsdFile::growOrEvictLocked() {
process::TraceContext trace("SsdFile::growOrEvictLocked");
if (numRegions_ < maxRegions_) {
const auto newSize = (numRegions_ + 1) * kRegionSize;
const auto rc = ::ftruncate(fd_, newSize);
if (rc >= 0) {
try {
writeDataFile_->truncate(newSize);
fileSize_ = newSize;
writableRegions_.push_back(numRegions_);
regionSizes_[numRegions_] = 0;
erasedRegionSizes_[numRegions_] = 0;
++numRegions_;
VELOX_SSD_CACHE_LOG(WARNING)
<< "Grow " << fileName_ << " to " << numRegions_
<< " regions (max: " << maxRegions_ << ")";
return true;
} catch (const std::exception& e) {
++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize
<< " with error: " << e.what();
}

++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize;
}

const auto candidates =
Expand Down Expand Up @@ -467,20 +481,21 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
int64_t offset,
int64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret == length) {
return true;
try {
writeDataFile_->write(iovecs, offset, length);
} catch (const std::exception& e) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
return true;
}

namespace {
Expand All @@ -497,8 +512,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
VELOX_CHECK_EQ(rc, entry.size());
const auto rc =
readDataFile_->pread(ssdRun.offset(), entry.size(), testData.get());
VELOX_CHECK_EQ(rc.size(), entry.size());
if (entry.tinyData() != nullptr) {
if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) {
VELOX_FAIL("bad read back");
Expand Down Expand Up @@ -569,9 +585,9 @@ void SsdFile::clear() {

void SsdFile::testingDeleteFile() {
process::TraceContext trace("SsdFile::testingDeleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
if (writeDataFile_) {
writeDataFile_->close();
writeDataFile_ = nullptr;
}
auto rc = unlink(fileName_.c_str());
if (rc < 0) {
Expand Down Expand Up @@ -744,11 +760,10 @@ void SsdFile::checkpoint(bool force) {
// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}
auto fileSync = std::make_shared<AsyncSource<int>>([this]() {
writeDataFile_->flush();
return std::make_unique<int>(0);
});

std::ofstream state;
const auto checkpointPath = getCheckpointFilePath();
Expand Down Expand Up @@ -803,8 +818,7 @@ void SsdFile::checkpoint(bool force) {

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
checkRc(*fileSyncRc, "Sync of cache data file");
fileSync->move();

const auto endMarker = kCheckpointEndMarker;
state.write(asChar(&endMarker), sizeof(endMarker));
Expand Down Expand Up @@ -943,18 +957,20 @@ void SsdFile::maybeVerifyChecksum(
}
}

bool SsdFile::testingIsCowDisabled() const {
void SsdFile::disableFileCow() {
#ifdef linux
int attr{0};
const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeDataFile_->setAttributes(attributes);
#endif // linux
}

return (attr & FS_NOCOW_FL) == FS_NOCOW_FL;
bool SsdFile::testingIsCowDisabled() const {
#ifdef linux
const auto attributes = writeDataFile_->getAttributes();
const auto it =
attributes.find(std::string(LocalWriteFile::Attributes::kNoCow));
return it != attributes.end() && it->second == "true";
#else
return false;
#endif // linux
Expand Down
19 changes: 13 additions & 6 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFileTracker.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"

DECLARE_bool(ssd_odirect);
DECLARE_bool(ssd_verify_write);
Expand Down Expand Up @@ -472,8 +473,7 @@ class SsdFile {

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);
bool write(int64_t offset, int64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
Expand All @@ -499,6 +499,10 @@ class SsdFile {
const AsyncDataCacheEntry& entry,
const SsdRun& ssdRun);

// Disable 'copy on write'. Will throw if failed for any reason, including
// file system not supporting cow feature.
void disableFileCow();

// Returns true if checksum write is enabled for the given version.
static bool isChecksumEnabledOnCheckpointVersion(
const std::string& checkpointVersion) {
Expand Down Expand Up @@ -556,14 +560,17 @@ class SsdFile {
// Map of file number and offset to location in file.
folly::F14FastMap<FileCacheKey, SsdRun> entries_;

// File descriptor. 0 (stdin) means file not open.
int32_t fd_{0};
// File system.
std::shared_ptr<filesystems::FileSystem> fs_;

// Size of the backing file in bytes. Must be multiple of kRegionSize.
uint64_t fileSize_{0};

// ReadFile made from 'fd_'.
std::unique_ptr<ReadFile> readFile_;
// ReadFile for cache data file.
std::unique_ptr<ReadFile> readDataFile_;

// WriteFile for cache data file.
std::unique_ptr<WriteFile> writeDataFile_;

// Counters.
SsdCacheStats stats_;
Expand Down
2 changes: 2 additions & 0 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test {
static constexpr int64_t kMB = 1 << 20;

void SetUp() override {
filesystems::registerLocalFileSystem();
memory::MemoryManager::testingSetInstance({});
}

Expand Down

0 comments on commit a688ce0

Please sign in to comment.