Skip to content

Commit

Permalink
Use Velox filesystem for checkpoint/evictfile
Browse files Browse the repository at this point in the history
  • Loading branch information
zacw7 committed Oct 26, 2024
1 parent 817fa90 commit 8939b42
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 72 deletions.
146 changes: 81 additions & 65 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/common/caching/SsdFile.h"

#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/Crc.h"
Expand Down Expand Up @@ -67,6 +66,27 @@ void disableCow(int32_t fd) {
#endif // linux
}

void disableFileCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -141,32 +161,18 @@ SsdFile::SsdFile(const Config& config)
checkpointIntervalBytes_(config.checkpointIntervalBytes),
executor_(config.executor) {
process::TraceContext trace("SsdFile::SsdFile");
int32_t oDirect = 0;
#ifdef linux
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
#endif // linux
fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR);
if (fd_ < 0) {
++stats_.openFileErrors;
}
// TODO: add fault tolerant handling for open file errors.
VELOX_CHECK_GE(
fd_,
0,
"Cannot open or create {}. Error: {}",
fileName_,
folly::errnoStr(errno));

if (disableFileCow_) {
disableCow(fd_);
}

readFile_ = std::make_unique<LocalReadFile>(fd_);
const uint64_t size = lseek(fd_, 0, SEEK_END);
fs_ = filesystems::getFileSystem(fileName_, nullptr);
filesystems::FileOptions fileOptions;
fileOptions.shouldThrowOnFileAlreadyExists = false;
fileOptions.bufferWrite = false;
writeDataFile_ = fs_->openFileForWrite(fileName_, fileOptions);
readDataFile_ = fs_->openFileForRead(fileName_);

const uint64_t size = writeDataFile_->size();
numRegions_ = std::min<int32_t>(size / kRegionSize, maxRegions_);
fileSize_ = numRegions_ * kRegionSize;
if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) {
::ftruncate(fd_, fileSize_);
writeDataFile_->truncate(fileSize_);
}
// The existing regions in the file are writable.
writableRegions_.resize(numRegions_);
Expand All @@ -178,6 +184,10 @@ SsdFile::SsdFile(const Config& config)
if (checkpointEnabled()) {
initializeCheckpoint();
}

if (disableFileCow_) {
disableFileCow();
}
}

void SsdFile::pinRegion(uint64_t offset) {
Expand Down Expand Up @@ -280,7 +290,7 @@ void SsdFile::read(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) {
process::TraceContext trace("SsdFile::read");
readFile_->preadv(offset, buffers);
readDataFile_->preadv(offset, buffers);
}

std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
Expand Down Expand Up @@ -324,19 +334,23 @@ bool SsdFile::growOrEvictLocked() {
process::TraceContext trace("SsdFile::growOrEvictLocked");
if (numRegions_ < maxRegions_) {
const auto newSize = (numRegions_ + 1) * kRegionSize;
const auto rc = ::ftruncate(fd_, newSize);
if (rc >= 0) {
try {
writeDataFile_->truncate(newSize);
fileSize_ = newSize;
writableRegions_.push_back(numRegions_);
regionSizes_[numRegions_] = 0;
erasedRegionSizes_[numRegions_] = 0;
++numRegions_;
VELOX_SSD_CACHE_LOG(WARNING)
<< "Grow " << fileName_ << " to " << numRegions_
<< " regions (max: " << maxRegions_ << ")";
return true;
} catch (const std::exception& e) {
++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize
<< " with error: " << e.what();
}

++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize;
}

const auto candidates =
Expand Down Expand Up @@ -467,20 +481,21 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
int64_t offset,
int64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret == length) {
return true;
try {
writeDataFile_->write(iovecs, offset, length);
} catch (const std::exception& e) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
return true;
}

namespace {
Expand All @@ -497,8 +512,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
VELOX_CHECK_EQ(rc, entry.size());
const auto rc =
readDataFile_->pread(ssdRun.offset(), entry.size(), testData.get());
VELOX_CHECK_EQ(rc.size(), entry.size());
if (entry.tinyData() != nullptr) {
if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) {
VELOX_FAIL("bad read back");
Expand Down Expand Up @@ -569,9 +585,9 @@ void SsdFile::clear() {

void SsdFile::testingDeleteFile() {
process::TraceContext trace("SsdFile::testingDeleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
if (writeDataFile_) {
writeDataFile_->close();
writeDataFile_ = nullptr;
}
auto rc = unlink(fileName_.c_str());
if (rc < 0) {
Expand Down Expand Up @@ -744,11 +760,10 @@ void SsdFile::checkpoint(bool force) {
// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}
auto fileSync = std::make_shared<AsyncSource<int>>([this]() {
writeDataFile_->flush();
return std::make_unique<int>(0);
});

std::ofstream state;
const auto checkpointPath = getCheckpointFilePath();
Expand Down Expand Up @@ -803,8 +818,7 @@ void SsdFile::checkpoint(bool force) {

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
checkRc(*fileSyncRc, "Sync of cache data file");
fileSync->move();

const auto endMarker = kCheckpointEndMarker;
state.write(asChar(&endMarker), sizeof(endMarker));
Expand Down Expand Up @@ -943,18 +957,20 @@ void SsdFile::maybeVerifyChecksum(
}
}

bool SsdFile::testingIsCowDisabled() const {
void SsdFile::disableFileCow() {
#ifdef linux
int attr{0};
const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeDataFile_->setAttributes(attributes);
#endif // linux
}

return (attr & FS_NOCOW_FL) == FS_NOCOW_FL;
bool SsdFile::testingIsCowDisabled() const {
#ifdef linux
const auto attributes = writeDataFile_->getAttributes();
const auto it =
attributes.find(std::string(LocalWriteFile::Attributes::kNoCow));
return it != attributes.end() && it->second == "true";
#else
return false;
#endif // linux
Expand Down
21 changes: 14 additions & 7 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFileTracker.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"

DECLARE_bool(ssd_odirect);
DECLARE_bool(ssd_verify_write);
Expand Down Expand Up @@ -261,7 +262,7 @@ class SsdFile {
checksumEnabled(_checksumEnabled),
checksumReadVerificationEnabled(
_checksumEnabled && _checksumReadVerificationEnabled),
executor(_executor){};
executor(_executor) {};

/// Name of cache file, used as prefix for checkpoint files.
const std::string fileName;
Expand Down Expand Up @@ -472,8 +473,7 @@ class SsdFile {

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);
bool write(int64_t offset, int64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
Expand All @@ -499,6 +499,10 @@ class SsdFile {
const AsyncDataCacheEntry& entry,
const SsdRun& ssdRun);

// Disable 'copy on write'. Will throw if failed for any reason, including
// file system not supporting cow feature.
void disableFileCow();

// Returns true if checksum write is enabled for the given version.
static bool isChecksumEnabledOnCheckpointVersion(
const std::string& checkpointVersion) {
Expand Down Expand Up @@ -556,14 +560,17 @@ class SsdFile {
// Map of file number and offset to location in file.
folly::F14FastMap<FileCacheKey, SsdRun> entries_;

// File descriptor. 0 (stdin) means file not open.
int32_t fd_{0};
// File system.
std::shared_ptr<filesystems::FileSystem> fs_;

// Size of the backing file in bytes. Must be multiple of kRegionSize.
uint64_t fileSize_{0};

// ReadFile made from 'fd_'.
std::unique_ptr<ReadFile> readFile_;
// ReadFile for cache data file.
std::unique_ptr<ReadFile> readDataFile_;

// WriteFile for cache data file.
std::unique_ptr<WriteFile> writeDataFile_;

// Counters.
SsdCacheStats stats_;
Expand Down
2 changes: 2 additions & 0 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test {
static constexpr int64_t kMB = 1 << 20;

void SetUp() override {
filesystems::registerLocalFileSystem();
memory::MemoryManager::testingSetInstance({});
}

Expand Down

0 comments on commit 8939b42

Please sign in to comment.