diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index ea5ef04b4b7e0..7defb34a9bb0f 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -238,10 +238,11 @@ LocalWriteFile::LocalWriteFile( { if (shouldThrowOnFileAlreadyExists) { FILE* exists = fopen(buf.get(), "rb"); - VELOX_CHECK( - !exists, - "Failure in LocalWriteFile: path '{}' already exists.", - path); + if (exists != nullptr) { + LOG(ERROR) << "bad"; + } + VELOX_CHECK_NULL( + exists, "Failure in LocalWriteFile: path '{}' already exists.", path); } } auto* file = fopen(buf.get(), "ab"); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 2ce64cad3b989..ef236659dde93 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -225,14 +225,14 @@ class InMemoryWriteFile final : public WriteFile { std::string* file_; }; -// Current implementation for the local version is quite simple (e.g. no -// internal arenaing), as local disk writes are expected to be cheap. Local -// files match against any filepath starting with '/'. - +/// Current implementation for the local version is quite simple (e.g. no +/// internal arenaing), as local disk writes are expected to be cheap. Local +/// files match against any filepath starting with '/'. class LocalReadFile final : public ReadFile { public: explicit LocalReadFile(std::string_view path); + /// TODO: deprecate this after creating local file all through velox fs interface. explicit LocalReadFile(int32_t fd); ~LocalReadFile(); diff --git a/velox/common/file/tests/CMakeLists.txt b/velox/common/file/tests/CMakeLists.txt index 446ef6a859e18..37cfa9c6ceb00 100644 --- a/velox/common/file/tests/CMakeLists.txt +++ b/velox/common/file/tests/CMakeLists.txt @@ -11,9 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +add_subdirectory(utils) add_library(velox_file_test_utils TestUtils.cpp) -target_link_libraries(velox_file_test_utils PUBLIC velox_file) +target_link_libraries(velox_file_test_utils PUBLIC velox_file velox_file_test_lib) add_executable(velox_file_test FileTest.cpp UtilsTest.cpp) add_test(velox_file_test velox_file_test) diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 9f837fc94a23d..238307f6ac22e 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -19,6 +19,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/file/tests/utils/FaultyFileSystem.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/exec/tests/utils/TempFilePath.h" @@ -26,6 +27,7 @@ using namespace facebook::velox; using facebook::velox::common::Region; +using namespace facebook::velox::tests::utils; constexpr int kOneMB = 1 << 20; @@ -126,45 +128,58 @@ TEST(InMemoryFile, preadv) { EXPECT_EQ(expected, values); } -TEST(LocalFile, writeAndRead) { +class LocalFileTest : public ::testing::TestWithParam { + protected: + LocalFileTest() : useFaultyFs_(GetParam()) {} + + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + tests::utils::registerFaultyFileSystem(); + } + + const bool useFaultyFs_; +}; + +TEST_P(LocalFileTest, writeAndRead) { for (bool useIOBuf : {true, false}) { - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); + SCOPED_TRACE(fmt::format("useIOBuf: {}", useIOBuf)); + + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - LocalWriteFile writeFile(filename); - writeData(&writeFile, useIOBuf); - writeFile.close(); - ASSERT_EQ(writeFile.size(), 15 + kOneMB); + auto writeFile = fs->openFileForWrite(filename); + writeData(writeFile.get(), useIOBuf); + writeFile->close(); + ASSERT_EQ(writeFile->size(), 15 + kOneMB); } - LocalReadFile readFile(filename); - readData(&readFile); + auto readFile = fs->openFileForRead(filename); + readData(readFile.get()); } } -TEST(LocalFile, viaRegistry) { - filesystems::registerLocalFileSystem(); - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); - auto lfs = filesystems::getFileSystem(filename, nullptr); +TEST_P(LocalFileTest, viaRegistry) { + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - auto writeFile = lfs->openFileForWrite(filename); + auto writeFile = fs->openFileForWrite(filename); writeFile->append("snarf"); } - auto readFile = lfs->openFileForRead(filename); + auto readFile = fs->openFileForRead(filename); ASSERT_EQ(readFile->size(), 5); char buffer1[5]; ASSERT_EQ(readFile->pread(0, 5, &buffer1), "snarf"); - lfs->remove(filename); + fs->remove(filename); } -TEST(LocalFile, rename) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/a", tempFolder->path); - auto b = fmt::format("{}/b", tempFolder->path); - auto newA = fmt::format("{}/newA", tempFolder->path); +TEST_P(LocalFileTest, rename) { + const auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + const auto a = fmt::format("{}/a", tempFolder->path()); + const auto b = fmt::format("{}/b", tempFolder->path()); + const auto newA = fmt::format("{}/newA", tempFolder->path()); const std::string data("aaaaa"); auto localFs = filesystems::getFileSystem(a, nullptr); { @@ -176,7 +191,7 @@ TEST(LocalFile, rename) { ASSERT_TRUE(localFs->exists(a)); ASSERT_TRUE(localFs->exists(b)); ASSERT_FALSE(localFs->exists(newA)); - EXPECT_THROW(localFs->rename(a, b), VeloxUserError); + VELOX_ASSERT_USER_THROW(localFs->rename(a, b), ""); localFs->rename(a, newA); ASSERT_FALSE(localFs->exists(a)); ASSERT_TRUE(localFs->exists(b)); @@ -187,11 +202,10 @@ TEST(LocalFile, rename) { ASSERT_EQ(readFile->pread(0, 5, &buffer), data); } -TEST(LocalFile, exists) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/a", tempFolder->path); - auto b = fmt::format("{}/b", tempFolder->path); +TEST_P(LocalFileTest, exists) { + auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + auto a = fmt::format("{}/a", tempFolder->path()); + auto b = fmt::format("{}/b", tempFolder->path()); auto localFs = filesystems::getFileSystem(a, nullptr); { auto writeFile = localFs->openFileForWrite(a); @@ -207,47 +221,50 @@ TEST(LocalFile, exists) { ASSERT_FALSE(localFs->exists(b)); } -TEST(LocalFile, list) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/1", tempFolder->path); - auto b = fmt::format("{}/2", tempFolder->path); +TEST_P(LocalFileTest, list) { + const auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + const auto a = fmt::format("{}/1", tempFolder->path()); + const auto b = fmt::format("{}/2", tempFolder->path()); auto localFs = filesystems::getFileSystem(a, nullptr); { auto writeFile = localFs->openFileForWrite(a); writeFile = localFs->openFileForWrite(b); } - auto files = localFs->list(std::string_view(tempFolder->path)); + auto files = localFs->list(std::string_view(tempFolder->path())); std::sort(files.begin(), files.end()); ASSERT_EQ(files, std::vector({a, b})); localFs->remove(a); ASSERT_EQ( - localFs->list(std::string_view(tempFolder->path)), + localFs->list(std::string_view(tempFolder->path())), std::vector({b})); localFs->remove(b); - ASSERT_TRUE(localFs->list(std::string_view(tempFolder->path)).empty()); + ASSERT_TRUE(localFs->list(std::string_view(tempFolder->path())).empty()); } -TEST(LocalFile, readFileDestructor) { - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); +TEST_P(LocalFileTest, readFileDestructor) { + if (useFaultyFs_) { + return; + } + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - LocalWriteFile writeFile(filename); - writeData(&writeFile); + auto writeFile = fs->openFileForWrite(filename); + writeData(writeFile.get()); } { - LocalReadFile readFile(filename); - readData(&readFile); + auto readFile = fs->openFileForRead(filename); + readData(readFile.get()); } int32_t readFd; { - std::unique_ptr buf(new char[tempFile->path.size() + 1]); - buf[tempFile->path.size()] = 0; - memcpy(buf.get(), tempFile->path.data(), tempFile->path.size()); - readFd = open(buf.get(), O_RDONLY); + std::unique_ptr buf(new char[tempFile->path().size() + 1]); + buf[tempFile->path().size()] = 0; + ::memcpy(buf.get(), tempFile->path().c_str(), tempFile->path().size()); + readFd = ::open(buf.get(), O_RDONLY); } { LocalReadFile readFile(readFd); @@ -260,11 +277,10 @@ TEST(LocalFile, readFileDestructor) { } } -TEST(LocalFile, mkdir) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); +TEST_P(LocalFileTest, mkdir) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); - std::string path = tempFolder->path; + std::string path = tempFolder->path(); auto localFs = filesystems::getFileSystem(path, nullptr); // Create 3 levels of directories and ensure they exist. @@ -286,11 +302,10 @@ TEST(LocalFile, mkdir) { EXPECT_TRUE(localFs->exists(path)); } -TEST(LocalFile, rmdir) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); +TEST_P(LocalFileTest, rmdir) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); - std::string path = tempFolder->path; + std::string path = tempFolder->path(); auto localFs = filesystems::getFileSystem(path, nullptr); // Create 3 levels of directories and ensure they exist. @@ -309,24 +324,283 @@ TEST(LocalFile, rmdir) { EXPECT_TRUE(localFs->exists(path)); // Now delete the whole temp folder and ensure it is gone. - EXPECT_NO_THROW(localFs->rmdir(tempFolder->path)); - EXPECT_FALSE(localFs->exists(tempFolder->path)); + EXPECT_NO_THROW(localFs->rmdir(tempFolder->path())); + EXPECT_FALSE(localFs->exists(tempFolder->path())); // Delete a non-existing directory. path += "/does_not_exist/subdir"; EXPECT_FALSE(localFs->exists(path)); // The function does not throw, but will return zero files and folders // deleted, which is not an error. - EXPECT_NO_THROW(localFs->rmdir(tempFolder->path)); + EXPECT_NO_THROW(localFs->rmdir(tempFolder->path())); } -TEST(LocalFile, fileNotFound) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto path = fmt::format("{}/file", tempFolder->path); +TEST_P(LocalFileTest, fileNotFound) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); + auto path = fmt::format("{}/file", tempFolder->path()); auto localFs = filesystems::getFileSystem(path, nullptr); VELOX_ASSERT_RUNTIME_THROW_CODE( localFs->openFileForRead(path), error_code::kFileNotFound, "No such file or directory"); } + +INSTANTIATE_TEST_SUITE_P( + LocalFileTestSuite, + LocalFileTest, + ::testing::Values(false, true)); + +class FaultyFsTest : public ::testing::Test { + protected: + FaultyFsTest() {} + + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + tests::utils::registerFaultyFileSystem(); + } + + void SetUp() { + dir_ = exec::test::TempDirectoryPath::create(true); + fs_ = std::dynamic_pointer_cast( + filesystems::getFileSystem(dir_->path(), {})); + VELOX_CHECK_NOT_NULL(fs_); + filePath_ = fmt::format("{}/faultyTestFile", dir_->path()); + const int bufSize = 1024; + buffer_.resize(bufSize); + for (int i = 0; i < bufSize; ++i) { + buffer_[i] = i % 256; + } + { + auto writeFile = fs_->openFileForWrite(filePath_, {}); + writeData(writeFile.get()); + } + auto readFile = fs_->openFileForRead(filePath_, {}); + readData(readFile.get(), true); + try { + VELOX_FAIL("InjectedFaultFileError"); + } catch (VeloxRuntimeError&) { + fileError_ = std::current_exception(); + } + } + + void TearDown() { + fs_->clearFileFaultInjections(); + } + + void writeData(WriteFile* file) { + file->append(std::string_view(buffer_)); + file->flush(); + } + + void readData(ReadFile* file, bool useReadv = false) { + char readBuf[buffer_.size()]; + if (!useReadv) { + file->pread(0, buffer_.size(), readBuf); + } else { + std::vector> buffers; + buffers.push_back(folly::Range(readBuf, buffer_.size())); + file->preadv(0, buffers); + } + for (int i = 0; i < buffer_.size(); ++i) { + if (buffer_[i] != readBuf[i]) { + VELOX_FAIL("Data Mismatch"); + } + } + } + + std::shared_ptr dir_; + std::string filePath_; + std::shared_ptr fs_; + std::string buffer_; + std::exception_ptr fileError_; +}; + +TEST_F(FaultyFsTest, fileReadErrorInjection) { + // Set read error. + fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kRead}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW( + readData(readFile.get(), false), "InjectedFaultFileError"); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + readData(readFile.get(), true); + } + + // Set readv error + fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kReadv}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW( + readData(readFile.get(), true), "InjectedFaultFileError"); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + readData(readFile.get(), false); + } + + // Set error for all kinds of operations. + fs_->setFileInjectionError(fileError_); + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW(readData(readFile.get(), true), "InjectedFaultFileError"); + VELOX_ASSERT_THROW(readData(readFile.get(), false), "InjectedFaultFileError"); + fs_->remove(filePath_); + // Check there is no interference on write as we don't support it for now. + auto writeFile = fs_->openFileForWrite(filePath_, {}); + writeData(writeFile.get()); +} + +TEST_F(FaultyFsTest, fileReadDelayInjection) { + // Set 3 seconds delay. + const uint64_t injectDelay{2'000'000}; + fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kRead}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), true); + } + ASSERT_LT(readDurationUs, injectDelay); + } + + // Set readv error + fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kReadv}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), true); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_LT(readDurationUs, injectDelay); + } + + // Set error for all kinds of operations. + fs_->setFileInjectionDelay(injectDelay); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + + fs_->remove(filePath_); + // Check there is no interference on write as we don't support it for now. + auto writeFile = fs_->openFileForWrite(filePath_, {}); + uint64_t writeDurationUs{0}; + { + MicrosecondTimer writeTimer(&writeDurationUs); + writeData(writeFile.get()); + } + ASSERT_LT(writeDurationUs, injectDelay); +} + +TEST_F(FaultyFsTest, fileReadFaultHookInjection) { + const std::string path1 = fmt::format("{}/hookFile1", dir_->path()); + { + auto writeFile = fs_->openFileForWrite(path1, {}); + writeData(writeFile.get()); + auto readFile = fs_->openFileForRead(path1, {}); + readData(readFile.get()); + } + const std::string path2 = fmt::format("{}/hookFile2", dir_->path()); + { + auto writeFile = fs_->openFileForWrite(path2, {}); + writeData(writeFile.get()); + auto readFile = fs_->openFileForRead(path2, {}); + readData(readFile.get()); + } + // Set read error. + fs_->setFileInjectionHook([&](FaultFileOperation* op) { + // Only inject error for readv. + if (op->type != FaultFileOperation::Type::kReadv) { + return; + } + // Only inject error for path2. + if (op->path != path2) { + return; + } + VELOX_FAIL("inject hook read failure"); + }); + { + auto readFile = fs_->openFileForRead(path1, {}); + readData(readFile.get(), false); + readData(readFile.get(), true); + } + { + auto readFile = fs_->openFileForRead(path2, {}); + // Verify only throw for readv. + readData(readFile.get(), false); + VELOX_ASSERT_THROW( + readData(readFile.get(), true), "inject hook read failure"); + } + + // Set to return fake data. + fs_->setFileInjectionHook([&](FaultFileOperation* op) { + // Only inject error for path1. + if (op->path != path1) { + return; + } + // Only inject error for read. + if (op->type != FaultFileOperation::Type::kRead) { + return; + } + auto* readOp = static_cast(op); + char* readBuf = static_cast(readOp->buf); + for (int i = 0; i < readOp->length; ++i) { + readBuf[i] = 0; + } + readOp->delegate = false; + }); + + { + auto readFile = fs_->openFileForRead(path2, {}); + readData(readFile.get(), false); + readData(readFile.get(), true); + } + { + auto readFile = fs_->openFileForRead(path1, {}); + // Verify only throw for read. + readData(readFile.get(), true); + VELOX_ASSERT_THROW( + readData(readFile.get(), false), "Data Mismatch"); + } +} \ No newline at end of file diff --git a/velox/common/file/tests/utils/CMakeLists.txt b/velox/common/file/tests/utils/CMakeLists.txt new file mode 100644 index 0000000000000..56bdede2f417e --- /dev/null +++ b/velox/common/file/tests/utils/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library( + velox_file_test_lib + FaultyFile.cpp + FaultyFileSystem.cpp) + +target_link_libraries( + velox_file_test_lib + velox_file) diff --git a/velox/common/file/tests/utils/FaultyFile.cpp b/velox/common/file/tests/utils/FaultyFile.cpp new file mode 100644 index 0000000000000..8c74d768d97fb --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFile.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/tests/utils/FaultyFile.h" + +namespace facebook::velox::tests::utils { + +FaultyReadFile::FaultyReadFile( + const std::string& path, + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook) + : path_(path), + delegatedFile_(std::move(delegatedFile)), + injectionHook_(std::move(injectionHook)) { + VELOX_CHECK_NOT_NULL(delegatedFile_); +} + +std::string_view +FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { + if (injectionHook_ != nullptr) { + FaultFileReadOperation op(path_, offset, length, buf); + injectionHook_(&op); + if (!op.delegate) { + return op.injectedReadBuf; + } + } + return delegatedFile_->pread(offset, length, buf); +} + +uint64_t FaultyReadFile::preadv( + uint64_t offset, + const std::vector>& buffers) const { + if (injectionHook_ != nullptr) { + FaultFileReadvOperation op(path_, offset, buffers); + injectionHook_(&op); + if (!op.delegate) { + return op.readBytes; + } + } + return delegatedFile_->preadv(offset, buffers); +} + +FaultyWriteFile::FaultyWriteFile( + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook) + : delegatedFile_(std::move(delegatedFile)), + injectionHook_(std::move(injectionHook)) { + VELOX_CHECK_NOT_NULL(delegatedFile_); +} +void FaultyWriteFile::append(std::string_view data) { + delegatedFile_->append(data); +} + +void FaultyWriteFile::append(std::unique_ptr data) { + delegatedFile_->append(std::move(data)); +} + +void FaultyWriteFile::flush() { + delegatedFile_->flush(); +} + +void FaultyWriteFile::close() { + delegatedFile_->close(); +} +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFile.h b/velox/common/file/tests/utils/FaultyFile.h new file mode 100644 index 0000000000000..d744583fdaa86 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFile.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/file/File.h" + +namespace facebook::velox::tests::utils { + +/// Defines the per-file operation fault injection. +struct FaultFileOperation { + enum class Type { + /// Injects fault for file read operations. + kRead, + kReadv, + /// TODO: add to support fault injections for the other operator types. + }; + + const Type type; + const std::string path; + + /// Indicates to forward this operation to the delegate file or not. If not, + /// then the file fault injection hook must have processed the request. For + /// instance, if this is a file read injection, then the hook must have filled + /// the fake data for read. + bool delegate{true}; + + FaultFileOperation(Type _type, const std::string& _path) + : type(_type), path(_path) {} +}; + +/// Defines fault injection parameters for file read. +struct FaultFileReadOperation : FaultFileOperation { + const uint64_t offset; + const uint64_t length; + void* const buf; + std::string_view injectedReadBuf; + + FaultFileReadOperation( + const std::string& _path, + uint64_t _offset, + uint64_t _length, + void* _buf) + : FaultFileOperation(FaultFileOperation::Type::kRead, _path), + offset(_offset), + length(_length), + buf(_buf) {} +}; + +/// Defines fault injection parameters for file readv. +struct FaultFileReadvOperation : FaultFileOperation { + const uint64_t offset; + const std::vector>& buffers; + uint64_t readBytes{0}; + + FaultFileReadvOperation( + const std::string& _path, + uint64_t _offset, + const std::vector>& _buffers) + : FaultFileOperation(FaultFileOperation::Type::kReadv, _path), + offset(_offset), + buffers(_buffers) {} +}; + +using FileFaultInjectionHook = std::function; + +class FaultyReadFile : public ReadFile { + public: + FaultyReadFile( + const std::string& path, + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook); + + ~FaultyReadFile() override{}; + + uint64_t size() const override { + return delegatedFile_->size(); + } + + std::string_view pread(uint64_t offset, uint64_t length, void* buf) + const override; + + uint64_t preadv( + uint64_t offset, + const std::vector>& buffers) const override; + + uint64_t memoryUsage() const override { + return delegatedFile_->memoryUsage(); + } + + bool shouldCoalesce() const override { + return delegatedFile_->shouldCoalesce(); + } + + std::string getName() const override { + return delegatedFile_->getName(); + } + + uint64_t getNaturalReadSize() const override { + return delegatedFile_->getNaturalReadSize(); + } + + private: + const std::string path_; + const std::shared_ptr delegatedFile_; + const FileFaultInjectionHook injectionHook_; +}; + +class FaultyWriteFile : public WriteFile { + public: + FaultyWriteFile( + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook); + + ~FaultyWriteFile() override{}; + + void append(std::string_view data) override; + + void append(std::unique_ptr data) override; + + void flush() override; + + void close() override; + + uint64_t size() const final { + return delegatedFile_->size(); + } + + private: + const std::shared_ptr delegatedFile_; + const FileFaultInjectionHook injectionHook_; +}; + +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFileSystem.cpp b/velox/common/file/tests/utils/FaultyFileSystem.cpp new file mode 100644 index 0000000000000..0499c67647667 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFileSystem.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/tests/utils/FaultyFileSystem.h" +#include + +#include + +namespace facebook::velox::tests::utils { +namespace { +inline std::string extractPath(std::string_view path) { + VELOX_CHECK_EQ(path.find(FaultyFileSystem::scheme()), 0); + return std::string(path.substr(FaultyFileSystem::scheme().length())); +} + +inline std::string faultyPath(const std::string& path) { + return fmt::format("{}{}", FaultyFileSystem::scheme(), path); +} + +std::function schemeMatcher() { + // Note: presto behavior is to prefix local paths with 'file:'. + // Check for that prefix and prune to absolute regular paths as needed. + return [](std::string_view filePath) { + return filePath.find(FaultyFileSystem::scheme()) == 0; + }; +} + +folly::once_flag faultFilesystemInitOnceFlag; + +std::function(std::shared_ptr, std::string_view)> +fileSystemGenerator() { + return + [](std::shared_ptr properties, std::string_view filePath) { + // One instance of faulty FileSystem is sufficient. Initializes on first + // access and reuse after that. + static std::shared_ptr lfs; + folly::call_once(faultFilesystemInitOnceFlag, [&properties]() { + lfs = std::make_shared(properties); + }); + return lfs; + }; +} +} // namespace + +std::unique_ptr FaultyFileSystem::openFileForRead( + std::string_view path, + const FileOptions& options) { + const std::string delegatedPath = extractPath(path); + auto delegatedFile = getFileSystem(delegatedPath, config_) + ->openFileForRead(delegatedPath, options); + return std::make_unique( + std::string(path), std::move(delegatedFile), [&](FaultFileOperation* op) { + maybeInjectFileFault(op); + }); +} + +std::unique_ptr FaultyFileSystem::openFileForWrite( + std::string_view path, + const FileOptions& options) { + const std::string delegatedPath = extractPath(path); + auto delegatedFile = getFileSystem(delegatedPath, config_) + ->openFileForWrite(delegatedPath, options); + return std::make_unique( + std::move(delegatedFile), + [&](FaultFileOperation* op) { maybeInjectFileFault(op); }); +} + +void FaultyFileSystem::remove(std::string_view path) { + const std::string delegatedPath = extractPath(path); + getFileSystem(delegatedPath, config_)->remove(delegatedPath); +} + +void FaultyFileSystem::rename( + std::string_view oldPath, + std::string_view newPath, + bool overwrite) { + const auto delegatedOldPath = extractPath(oldPath); + const auto delegatedNewPath = extractPath(newPath); + getFileSystem(delegatedOldPath, config_) + ->rename(delegatedOldPath, delegatedNewPath, overwrite); +} + +bool FaultyFileSystem::exists(std::string_view path) { + const auto delegatedPath = extractPath(path); + return getFileSystem(delegatedPath, config_)->exists(delegatedPath); +} + +std::vector FaultyFileSystem::list(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + const auto delegatedFiles = + getFileSystem(delegatedDirPath, config_)->list(delegatedDirPath); + std::vector files; + files.reserve(delegatedFiles.size()); + for (const auto& delegatedFile : delegatedFiles) { + files.push_back(faultyPath(delegatedFile)); + } + return files; +} + +void FaultyFileSystem::mkdir(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + getFileSystem(delegatedDirPath, config_)->mkdir(delegatedDirPath); +} + +void FaultyFileSystem::rmdir(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + getFileSystem(delegatedDirPath, config_)->rmdir(delegatedDirPath); +} + +void FaultyFileSystem::setFileInjectionHook( + FileFaultInjectionHook injectionHook) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(std::move(injectionHook)); +} + +void FaultyFileSystem::setFileInjectionError( + std::exception_ptr error, + std::unordered_set opTypes) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(std::move(error), std::move(opTypes)); +} + +void FaultyFileSystem::setFileInjectionDelay( + uint64_t delayUs, + std::unordered_set opTypes) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(delayUs, std::move(opTypes)); +} + +void FaultyFileSystem::clearFileFaultInjections() { + std::lock_guard l(mu_); + fileInjections_.reset(); +} + +void FaultyFileSystem::maybeInjectFileFault(FaultFileOperation* op) { + FileInjections injections; + { + std::lock_guard l(mu_); + if (!fileInjections_.has_value()) { + return; + } + injections = fileInjections_.value(); + } + + if (injections.fileInjectionHook != nullptr) { + injections.fileInjectionHook(op); + return; + } + + if (!injections.opTypes.empty() && injections.opTypes.count(op->type) == 0) { + return; + } + + if (injections.fileException != nullptr) { + std::rethrow_exception(injections.fileException); + } + + if (injections.fileDelayUs != 0) { + std::this_thread::sleep_for( + std::chrono::microseconds(injections.fileDelayUs)); + } +} + +void registerFaultyFileSystem() { + registerFileSystem(schemeMatcher(), fileSystemGenerator()); +} + +std::shared_ptr faultyFileSystem() { + return std::dynamic_pointer_cast( + getFileSystem(FaultyFileSystem::scheme(), {})); +} +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFileSystem.h b/velox/common/file/tests/utils/FaultyFileSystem.h new file mode 100644 index 0000000000000..9b4b75d03b7d0 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFileSystem.h @@ -0,0 +1,122 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/file/FileSystems.h" + +#include +#include +#include +#include "velox/common/file/tests/utils/FaultyFile.h" + +namespace facebook::velox::tests::utils { + +using namespace filesystems; + +/// Implements fault filesystem which is a wrapper on top of a real file system. +/// It is used by unit test for io fault injection. By default, it delegates the +/// the file operation to the file system underneath. +class FaultyFileSystem : public FileSystem { + public: + explicit FaultyFileSystem(std::shared_ptr config) + : FileSystem(std::move(config)) {} + + ~FaultyFileSystem() override {} + + static inline std::string scheme() { + return "faulty:"; + } + + std::string name() const override { + return "Faulty FS"; + } + + std::unique_ptr openFileForRead( + std::string_view path, + const FileOptions& options) override; + + std::unique_ptr openFileForWrite( + std::string_view path, + const FileOptions& options) override; + + void remove(std::string_view path) override; + + void rename( + std::string_view oldPath, + std::string_view newPath, + bool overwrite) override; + + bool exists(std::string_view path) override; + + std::vector list(std::string_view path) override; + + void mkdir(std::string_view path) override; + + void rmdir(std::string_view path) override; + + /// Setups hook for file fault injection. + void setFileInjectionHook(FileFaultInjectionHook hook); + + void setFileInjectionError( + std::exception_ptr error, + std::unordered_set opTypes = {}); + + void setFileInjectionDelay( + uint64_t delayUs, + std::unordered_set opTypes = {}); + + /// Clear hooks for file fault injections. + void clearFileFaultInjections(); + + private: + struct FileInjections { + FileFaultInjectionHook fileInjectionHook{nullptr}; + + std::exception_ptr fileException{nullptr}; + + uint64_t fileDelayUs{0}; + + std::unordered_set opTypes{}; + + FileInjections() = default; + + explicit FileInjections(FileFaultInjectionHook _fileInjectionHook) + : fileInjectionHook(std::move(_fileInjectionHook)) {} + + FileInjections( + uint64_t _fileDelayUs, + std::unordered_set _opTypes) + : fileDelayUs(_fileDelayUs), opTypes(std::move(_opTypes)) {} + + FileInjections( + std::exception_ptr _fileException, + std::unordered_set _opTypes) + : fileException(std::move(_fileException)), + opTypes(std::move(_opTypes)) {} + }; + + void maybeInjectFileFault(FaultFileOperation* op); + + mutable std::mutex mu_; + std::optional fileInjections_; +}; + +/// Registers the faulty filesystem. +void registerFaultyFileSystem(); + +/// Gets the fault filesystem instance. +std::shared_ptr faultyFileSystem(); +} // namespace facebook::velox::tests::utils diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index 58c5d0af4b25e..d5311e7dc66ec 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -94,7 +94,7 @@ QueryTestResult runHashJoinTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) .config(core::QueryConfig::kSpillStartPartitionBit, "29") @@ -137,7 +137,7 @@ QueryTestResult runAggregateTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") .queryCtx(queryCtx) @@ -179,7 +179,7 @@ QueryTestResult runOrderByTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kOrderBySpillEnabled, "true") .queryCtx(queryCtx) @@ -221,7 +221,7 @@ QueryTestResult runRowNumberTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kRowNumberSpillEnabled, "true") .queryCtx(queryCtx) @@ -264,7 +264,7 @@ QueryTestResult runTopNTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") .queryCtx(queryCtx) @@ -306,12 +306,12 @@ QueryTestResult runWriteTask( const RowVectorPtr& expectedResult) { QueryTestResult result; const auto outputDirectory = TempDirectoryPath::create(); - auto plan = writePlan(vectors, outputDirectory->path, result.planNodeId); + auto plan = writePlan(vectors, outputDirectory->path(), result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "false") .config(core::QueryConfig::kWriterSpillEnabled, "true") diff --git a/velox/exec/tests/utils/TempDirectoryPath.cpp b/velox/exec/tests/utils/TempDirectoryPath.cpp index b34815a0cd5a9..662c18aa5fe59 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.cpp +++ b/velox/exec/tests/utils/TempDirectoryPath.cpp @@ -20,21 +20,28 @@ namespace facebook::velox::exec::test { -std::shared_ptr TempDirectoryPath::create() { - struct SharedTempDirectoryPath : public TempDirectoryPath { - SharedTempDirectoryPath() : TempDirectoryPath() {} - }; - return std::make_shared(); +std::shared_ptr TempDirectoryPath::create(bool injectFault) { + auto* tempDirPathPtr = new TempDirectoryPath(injectFault); + return std::shared_ptr(tempDirPathPtr); } TempDirectoryPath::~TempDirectoryPath() { - LOG(INFO) << "TempDirectoryPath:: removing all files from " << path; + LOG(INFO) << "TempDirectoryPath:: removing all files from " << path_; try { - boost::filesystem::remove_all(path.c_str()); + boost::filesystem::remove_all(path_.c_str()); } catch (...) { LOG(WARNING) << "TempDirectoryPath:: destructor failed while calling boost::filesystem::remove_all"; } } +std::string TempDirectoryPath::createTempDirectory() { + char path[] = "/tmp/velox_test_XXXXXX"; + const char* tempDirectoryPath = ::mkdtemp(path); + if (tempDirectoryPath == nullptr) { + VELOX_FAIL("Cannot open temp directory"); + } + return tempDirectoryPath; +} + } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempDirectoryPath.h b/velox/exec/tests/utils/TempDirectoryPath.h index 31d8d5dbea161..b82d636982cab 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.h +++ b/velox/exec/tests/utils/TempDirectoryPath.h @@ -24,27 +24,35 @@ namespace facebook::velox::exec::test { -// It manages the lifetime of a temporary directory. +/// Manages the lifetime of a temporary directory. class TempDirectoryPath { public: - static std::shared_ptr create(); + static std::shared_ptr create( + bool enableFaultInjection = false); virtual ~TempDirectoryPath(); - const std::string path; - TempDirectoryPath(const TempDirectoryPath&) = delete; TempDirectoryPath& operator=(const TempDirectoryPath&) = delete; - TempDirectoryPath() : path(createTempDirectory()) {} + std::string path() const { + return enableFaultInjection_ ? fmt::format("faulty:{}", path_) : path_; + } - static std::string createTempDirectory() { - char path[] = "/tmp/velox_test_XXXXXX"; - const char* tempDirectoryPath = mkdtemp(path); - if (tempDirectoryPath == nullptr) { - throw std::logic_error("Cannot open temp directory"); - } - return tempDirectoryPath; + /// Returns the delegated file path if fault injection is enabled. + std::string rawPath() const { + return path_; } + + private: + static std::string createTempDirectory(); + + explicit TempDirectoryPath(bool enableFaultInjection) + : path_(createTempDirectory()), + enableFaultInjection_(enableFaultInjection) {} + + const std::string path_; + + bool enableFaultInjection_{false}; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempFilePath.cpp b/velox/exec/tests/utils/TempFilePath.cpp index 7c5cbe7370c18..46c28ff72bc26 100644 --- a/velox/exec/tests/utils/TempFilePath.cpp +++ b/velox/exec/tests/utils/TempFilePath.cpp @@ -18,11 +18,22 @@ namespace facebook::velox::exec::test { -std::shared_ptr TempFilePath::create() { - struct SharedTempFilePath : public TempFilePath { - SharedTempFilePath() : TempFilePath() {} - }; - return std::make_shared(); +TempFilePath::~TempFilePath() { + ::unlink(path_.c_str()); + ::close(fd_); } +std::shared_ptr TempFilePath::create(bool enableFaultInjection) { + auto* tempFilePathPtr = new TempFilePath(enableFaultInjection); + return std::shared_ptr(tempFilePathPtr); +} + +std::string TempFilePath::createTempFile(TempFilePath* tempFilePath) { + char path[] = "/tmp/velox_test_XXXXXX"; + tempFilePath->fd_ = ::mkstemp(path); + if (tempFilePath->fd_ == -1) { + VELOX_FAIL("Cannot open temp file: {}", folly::errnoStr(errno)); + } + return path; +} } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempFilePath.h b/velox/exec/tests/utils/TempFilePath.h index d993795f1e3a0..9a2eec443118e 100644 --- a/velox/exec/tests/utils/TempFilePath.h +++ b/velox/exec/tests/utils/TempFilePath.h @@ -26,23 +26,19 @@ namespace facebook::velox::exec::test { -// It manages the lifetime of a temporary file. +/// Manages the lifetime of a temporary file. class TempFilePath { public: - static std::shared_ptr create(); + static std::shared_ptr create( + bool enableFaultInjection = false); - virtual ~TempFilePath() { - unlink(path.c_str()); - close(fd); - } - - const std::string path; + ~TempFilePath(); TempFilePath(const TempFilePath&) = delete; TempFilePath& operator=(const TempFilePath&) = delete; void append(std::string data) { - std::ofstream file(path, std::ios_base::app); + std::ofstream file(path_, std::ios_base::app); file << data; file.flush(); file.close(); @@ -50,31 +46,33 @@ class TempFilePath { const int64_t fileSize() { struct stat st; - stat(path.data(), &st); + ::stat(path_.data(), &st); return st.st_size; } - const int64_t fileModifiedTime() { + int64_t fileModifiedTime() { struct stat st; - stat(path.data(), &st); + ::stat(path_.data(), &st); return st.st_mtime; } + std::string path() const { + return enableFaultInjection_ ? fmt::format("faulty:{}", path_) : path_; + } + private: - int fd; + static std::string createTempFile(TempFilePath* tempFilePath); - TempFilePath() : path(createTempFile(this)) { - VELOX_CHECK_NE(fd, -1); + TempFilePath(bool enableFaultInjection) + : enableFaultInjection_(enableFaultInjection), + path_(createTempFile(this)) { + VELOX_CHECK_NE(fd_, -1); } - static std::string createTempFile(TempFilePath* tempFilePath) { - char path[] = "/tmp/velox_test_XXXXXX"; - tempFilePath->fd = mkstemp(path); - if (tempFilePath->fd == -1) { - throw std::logic_error("Cannot open temp file"); - } - return path; - } + const bool enableFaultInjection_; + const std::string path_; + + int fd_; }; } // namespace facebook::velox::exec::test