Skip to content

Commit

Permalink
feat: Support API for setting a spill directory callback in task (fac…
Browse files Browse the repository at this point in the history
…ebookincubator#11572)

Summary:

Add ability to set a spill directory callback in task. The callback will create the spill directory and return the directory created

We also add the ability for mkdir to decide whether or not to fail if a directory was created

Differential Revision: D65964247
  • Loading branch information
yuandagits authored and facebook-github-bot committed Nov 21, 2024
1 parent af3f638 commit a9d2079
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 116 deletions.
10 changes: 7 additions & 3 deletions velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,14 @@ class LocalFileSystem : public FileSystem {
return filePaths;
}

void mkdir(std::string_view path, const DirectoryOptions& /*options*/)
override {
void mkdir(std::string_view path, const DirectoryOptions& options) override {
std::error_code ec;
std::filesystem::create_directories(path, ec);
const bool created = std::filesystem::create_directories(path, ec);

if (!created && options.failIfExists) {
VELOX_FAIL("Directory: {} already exists", path);
}

VELOX_CHECK_EQ(
0,
ec.value(),
Expand Down
6 changes: 6 additions & 0 deletions velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ struct FileOptions {

/// Defines directory options
struct DirectoryOptions : FileOptions {
/// Whether to throw an error if the directory already exists.
/// For POSIX systems, this is equivalent to handling EEXIST.
///
/// NOTE: This is only applicable for mkdir
bool failIfExists{false};

/// This is similar to kFileCreateConfig
static constexpr folly::StringPiece kMakeDirectoryConfig{
"make-directory-config"};
Expand Down
13 changes: 0 additions & 13 deletions velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@

namespace facebook::velox::tests::utils {

std::string FaultFileOperation::typeString(Type type) {
switch (type) {
case Type::kReadv:
return "READV";
case Type::kRead:
return "READ";
default:
VELOX_UNSUPPORTED(
"Unknown file operation type: {}", static_cast<int>(type));
break;
}
}

FaultyReadFile::FaultyReadFile(
const std::string& path,
std::shared_ptr<ReadFile> delegatedFile,
Expand Down
98 changes: 1 addition & 97 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,10 @@
#pragma once

#include "velox/common/file/File.h"
#include "velox/common/file/tests/FaultyFileSystemOperations.h"

namespace facebook::velox::tests::utils {

/// Defines the per-file operation fault injection.
struct FaultFileOperation {
enum class Type {
/// Injects faults for file read operations.
kRead,
kReadv,
kAppend,
kWrite,
/// TODO: add to support fault injections for the other file operation
/// types.
};
static std::string typeString(Type type);

const Type type;

/// The delegated file path.
const std::string path;

/// Indicates to forward this operation to the delegated file or not. If not,
/// then the file fault injection hook must have processed the request. For
/// instance, if this is a file read injection, then the hook must have filled
/// the fake read data for data corruption tests.
bool delegate{true};

FaultFileOperation(Type _type, const std::string& _path)
: type(_type), path(_path) {}
};

FOLLY_ALWAYS_INLINE std::ostream& operator<<(
std::ostream& o,
const FaultFileOperation::Type& type) {
return o << FaultFileOperation::typeString(type);
}

/// Fault injection parameters for file read API.
struct FaultFileReadOperation : FaultFileOperation {
const uint64_t offset;
const uint64_t length;
void* const buf;

FaultFileReadOperation(
const std::string& _path,
uint64_t _offset,
uint64_t _length,
void* _buf)
: FaultFileOperation(FaultFileOperation::Type::kRead, _path),
offset(_offset),
length(_length),
buf(_buf) {}
};

/// Fault injection parameters for file readv API.
struct FaultFileReadvOperation : FaultFileOperation {
const uint64_t offset;
const std::vector<folly::Range<char*>>& buffers;
uint64_t readBytes{0};

FaultFileReadvOperation(
const std::string& _path,
uint64_t _offset,
const std::vector<folly::Range<char*>>& _buffers)
: FaultFileOperation(FaultFileOperation::Type::kReadv, _path),
offset(_offset),
buffers(_buffers) {}
};

/// Fault injection parameters for file append API.
struct FaultFileAppendOperation : FaultFileOperation {
std::string_view* data;

FaultFileAppendOperation(
const std::string& _path,
const std::string_view& _data)
: FaultFileOperation(FaultFileOperation::Type::kAppend, _path),
data(const_cast<std::string_view*>(&_data)) {}
};

/// Fault injection parameters for file write API.
struct FaultFileWriteOperation : FaultFileOperation {
const std::vector<iovec>& iovecs;
int64_t offset;
int64_t length;

FaultFileWriteOperation(
const std::string& _path,
const std::vector<iovec>& _iovecs,
int64_t _offset,
int64_t _length)
: FaultFileOperation(FaultFileOperation::Type::kWrite, _path),
iovecs(_iovecs),
offset(_offset),
length(_length) {}
};

/// The fault injection hook on the file operation path.
using FileFaultInjectionHook = std::function<void(FaultFileOperation*)>;

class FaultyReadFile : public ReadFile {
public:
FaultyReadFile(
Expand Down
56 changes: 55 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,37 @@ void FaultyFileSystem::mkdir(
std::string_view path,
const DirectoryOptions& options) {
const auto delegatedDirPath = extractPath(path);
getFileSystem(delegatedDirPath, config_)->mkdir(delegatedDirPath, options);
auto delegatedFs = getFileSystem(delegatedDirPath, config_);

auto op =
FaultFileSystemMkdirOperation(std::string(delegatedDirPath), options);
maybeInjectFilesystemFault(&op);
if (!op.delegate) {
return;
}

delegatedFs->mkdir(delegatedDirPath, options);
}

void FaultyFileSystem::rmdir(std::string_view path) {
const auto delegatedDirPath = extractPath(path);
getFileSystem(delegatedDirPath, config_)->rmdir(delegatedDirPath);
}

void FaultyFileSystem::setFilesystemInjectionHook(
FileSystemFaultInjectionHook hook) {
std::lock_guard<std::mutex> l(mu_);
fsInjections_ = FileSystemInjections(std::move(hook));
}

void FaultyFileSystem::setFileSystemInjectionError(
std::exception_ptr exception,
std::unordered_set<FaultFileSystemOperation::Type> opTypes) {
std::lock_guard<std::mutex> l(mu_);
fsInjections_ =
FileSystemInjections(std::move(exception), std::move(opTypes));
}

void FaultyFileSystem::setFileInjectionHook(
FileFaultInjectionHook injectionHook) {
std::lock_guard<std::mutex> l(mu_);
Expand All @@ -144,11 +167,42 @@ void FaultyFileSystem::setFileInjectionDelay(
fileInjections_ = FileInjections(delayUs, std::move(opTypes));
}

void FaultyFileSystem::clearFileSystemInjections() {
std::lock_guard<std::mutex> l(mu_);
fsInjections_.reset();
}

void FaultyFileSystem::clearFileFaultInjections() {
std::lock_guard<std::mutex> l(mu_);
fileInjections_.reset();
}

void FaultyFileSystem::maybeInjectFilesystemFault(
FaultFileSystemOperation* op) {
{
std::lock_guard<std::mutex> l(mu_);
if (!fsInjections_.has_value()) {
return;
}

auto& injections = fsInjections_.value();
if (injections.filesystemInjectionHook != nullptr) {
injections.filesystemInjectionHook(op);
return;
}

if (!injections.opTypes.empty() &&
injections.opTypes.count(op->type) == 0) {
return;
}

if (injections.directoryException != nullptr) {
std::rethrow_exception(injections.directoryException);
}
}
return;
}

void FaultyFileSystem::maybeInjectFileFault(FaultFileOperation* op) {
FileInjections injections;
{
Expand Down
39 changes: 39 additions & 0 deletions velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ class FaultyFileSystem : public FileSystem {
executor_ = executor;
}

/// Sets the hook for filesystem fault injection.
void setFilesystemInjectionHook(FileSystemFaultInjectionHook hook);

/// Setups to inject 'error' for a particular set of filesystem operation
/// types. Only operations inside 'opTypes' will be injected with 'error'.
void setFileSystemInjectionError(
std::exception_ptr error,
std::unordered_set<FaultFileSystemOperation::Type> opTypes = {});

/// Setups hook for file fault injection.
void setFileInjectionHook(FileFaultInjectionHook hook);

Expand All @@ -101,7 +110,33 @@ class FaultyFileSystem : public FileSystem {
/// Clears the file fault injections.
void clearFileFaultInjections();

/// Clears the filesystem fault injections.
void clearFileSystemInjections();

private:
// Defines the per filesystem fault injection setup. Only one type of can be
// set at a time
struct FileSystemInjections {
// TODO: Support more flavors of fault injection
FileSystemFaultInjectionHook filesystemInjectionHook{nullptr};

std::exception_ptr directoryException{nullptr};

std::unordered_set<FaultFileSystemOperation::Type> opTypes{};

FileSystemInjections() = default;

explicit FileSystemInjections(
std::exception_ptr exception,
std::unordered_set<FaultFileSystemOperation::Type> _opTypes)
: directoryException(std::move(exception)),
opTypes(std::move(_opTypes)) {}

explicit FileSystemInjections(
FileSystemFaultInjectionHook _filesystemInjectionHook)
: filesystemInjectionHook(std::move(_filesystemInjectionHook)) {}
};

// Defines the file injection setup and only one type of injection can be set
// at a time.
struct FileInjections {
Expand Down Expand Up @@ -130,11 +165,15 @@ class FaultyFileSystem : public FileSystem {
opTypes(std::move(_opTypes)) {}
};

// Invoked to inject filesystem fault to 'op' if configured.
void maybeInjectFilesystemFault(FaultFileSystemOperation* op);

// Invoked to inject file fault to 'op' if configured.
void maybeInjectFileFault(FaultFileOperation* op);

mutable std::mutex mu_;
std::optional<FileInjections> fileInjections_;
std::optional<FileSystemInjections> fsInjections_;
folly::Executor* executor_;
};

Expand Down
Loading

0 comments on commit a9d2079

Please sign in to comment.