Skip to content

Commit

Permalink
Use Velox filesystem for SSD cache data file
Browse files Browse the repository at this point in the history
SSD cache currently uses native functions for file operations. This
needs to be switched to Velox filesystem, so more advanced testing can
be built by leveraging features like fault injections.
  • Loading branch information
zacw7 committed Oct 25, 2024
1 parent 817fa90 commit 7e45725
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 68 deletions.
121 changes: 57 additions & 64 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/common/caching/SsdFile.h"

#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/Crc.h"
Expand Down Expand Up @@ -141,32 +140,18 @@ SsdFile::SsdFile(const Config& config)
checkpointIntervalBytes_(config.checkpointIntervalBytes),
executor_(config.executor) {
process::TraceContext trace("SsdFile::SsdFile");
int32_t oDirect = 0;
#ifdef linux
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
#endif // linux
fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR);
if (fd_ < 0) {
++stats_.openFileErrors;
}
// TODO: add fault tolerant handling for open file errors.
VELOX_CHECK_GE(
fd_,
0,
"Cannot open or create {}. Error: {}",
fileName_,
folly::errnoStr(errno));

if (disableFileCow_) {
disableCow(fd_);
}

readFile_ = std::make_unique<LocalReadFile>(fd_);
const uint64_t size = lseek(fd_, 0, SEEK_END);
fs_ = filesystems::getFileSystem(fileName_, nullptr);
filesystems::FileOptions fileOptions;
fileOptions.shouldThrowOnFileAlreadyExists = false;
fileOptions.bufferWrite = false;
writeFile_ = fs_->openFileForWrite(fileName_, fileOptions);
readFile_ = fs_->openFileForRead(fileName_);

const uint64_t size = writeFile_->size();
numRegions_ = std::min<int32_t>(size / kRegionSize, maxRegions_);
fileSize_ = numRegions_ * kRegionSize;
if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) {
::ftruncate(fd_, fileSize_);
writeFile_->truncate(fileSize_);
}
// The existing regions in the file are writable.
writableRegions_.resize(numRegions_);
Expand All @@ -178,6 +163,16 @@ SsdFile::SsdFile(const Config& config)
if (checkpointEnabled()) {
initializeCheckpoint();
}

if (disableFileCow_) {
// Disable 'copy on write' on the given file. Will throw if failed for any
// reason, including file system not supporting cow feature.
#ifdef linux
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeFile_->setAttributes(attributes);
#endif // linux
}
}

void SsdFile::pinRegion(uint64_t offset) {
Expand Down Expand Up @@ -324,19 +319,23 @@ bool SsdFile::growOrEvictLocked() {
process::TraceContext trace("SsdFile::growOrEvictLocked");
if (numRegions_ < maxRegions_) {
const auto newSize = (numRegions_ + 1) * kRegionSize;
const auto rc = ::ftruncate(fd_, newSize);
if (rc >= 0) {
try {
writeFile_->truncate(newSize);
fileSize_ = newSize;
writableRegions_.push_back(numRegions_);
regionSizes_[numRegions_] = 0;
erasedRegionSizes_[numRegions_] = 0;
++numRegions_;
VELOX_SSD_CACHE_LOG(WARNING)
<< "Grow " << fileName_ << " to " << numRegions_
<< " regions (max: " << maxRegions_ << ")";
return true;
} catch (const std::exception& e) {
++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize
<< " with error: " << e.what();
}

++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize;
}

const auto candidates =
Expand Down Expand Up @@ -467,20 +466,21 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
int64_t offset,
int64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret == length) {
return true;
try {
writeFile_->write(iovecs, offset, length);
} catch (const std::exception& e) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
return true;
}

namespace {
Expand All @@ -497,8 +497,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
VELOX_CHECK_EQ(rc, entry.size());
const auto rc =
readFile_->pread(ssdRun.offset(), entry.size(), testData.get());
VELOX_CHECK_EQ(rc.size(), entry.size());
if (entry.tinyData() != nullptr) {
if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) {
VELOX_FAIL("bad read back");
Expand Down Expand Up @@ -569,9 +570,9 @@ void SsdFile::clear() {

void SsdFile::testingDeleteFile() {
process::TraceContext trace("SsdFile::testingDeleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
if (writeFile_) {
writeFile_->close();
writeFile_ = nullptr;
}
auto rc = unlink(fileName_.c_str());
if (rc < 0) {
Expand Down Expand Up @@ -744,11 +745,10 @@ void SsdFile::checkpoint(bool force) {
// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}
auto fileSync = std::make_shared<AsyncSource<int>>([this]() {
writeFile_->flush();
return std::make_unique<int>(0);
});

std::ofstream state;
const auto checkpointPath = getCheckpointFilePath();
Expand Down Expand Up @@ -803,8 +803,7 @@ void SsdFile::checkpoint(bool force) {

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
checkRc(*fileSyncRc, "Sync of cache data file");
fileSync->move();

const auto endMarker = kCheckpointEndMarker;
state.write(asChar(&endMarker), sizeof(endMarker));
Expand Down Expand Up @@ -945,16 +944,10 @@ void SsdFile::maybeVerifyChecksum(

bool SsdFile::testingIsCowDisabled() const {
#ifdef linux
int attr{0};
const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));

return (attr & FS_NOCOW_FL) == FS_NOCOW_FL;
const auto attributes = writeFile_->getAttributes();
const auto it =
attributes.find(std::string(LocalWriteFile::Attributes::kNoCow));
return it != attributes.end() && it->second == "true";
#else
return false;
#endif // linux
Expand Down
11 changes: 7 additions & 4 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFileTracker.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"

DECLARE_bool(ssd_odirect);
DECLARE_bool(ssd_verify_write);
Expand Down Expand Up @@ -472,8 +473,7 @@ class SsdFile {

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);
bool write(int64_t offset, int64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
Expand Down Expand Up @@ -556,15 +556,18 @@ class SsdFile {
// Map of file number and offset to location in file.
folly::F14FastMap<FileCacheKey, SsdRun> entries_;

// File descriptor. 0 (stdin) means file not open.
int32_t fd_{0};
// File system.
std::shared_ptr<filesystems::FileSystem> fs_;

// Size of the backing file in bytes. Must be multiple of kRegionSize.
uint64_t fileSize_{0};

// ReadFile made from 'fd_'.
std::unique_ptr<ReadFile> readFile_;

// WriteFile made from 'fd_'.
std::unique_ptr<WriteFile> writeFile_;

// Counters.
SsdCacheStats stats_;

Expand Down
2 changes: 2 additions & 0 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test {
static constexpr int64_t kMB = 1 << 20;

void SetUp() override {
filesystems::registerLocalFileSystem();
memory::MemoryManager::testingSetInstance({});
}

Expand Down

0 comments on commit 7e45725

Please sign in to comment.