From a9d20795ccbb153cc1979fcc3f3d6d39c187c7b3 Mon Sep 17 00:00:00 2001 From: Yenda Li Date: Thu, 21 Nov 2024 14:07:03 -0800 Subject: [PATCH] feat: Support API for setting a spill directory callback in task (#11572) Summary: Add ability to set a spill directory callback in task. The callback will create the spill directory and return the directory created We also add the ability for mkdir to decide whether or not to fail if a directory was created Differential Revision: D65964247 --- velox/common/file/FileSystems.cpp | 10 +- velox/common/file/FileSystems.h | 6 + velox/common/file/tests/FaultyFile.cpp | 13 -- velox/common/file/tests/FaultyFile.h | 98 +--------- velox/common/file/tests/FaultyFileSystem.cpp | 56 +++++- velox/common/file/tests/FaultyFileSystem.h | 39 ++++ .../file/tests/FaultyFileSystemOperations.h | 174 ++++++++++++++++++ velox/common/file/tests/FileTest.cpp | 24 +++ velox/exec/Driver.cpp | 2 +- velox/exec/Task.cpp | 13 +- velox/exec/Task.h | 17 ++ velox/exec/tests/TaskTest.cpp | 113 ++++++++++++ 12 files changed, 449 insertions(+), 116 deletions(-) create mode 100644 velox/common/file/tests/FaultyFileSystemOperations.h diff --git a/velox/common/file/FileSystems.cpp b/velox/common/file/FileSystems.cpp index 4f215a7ce5f1d..5b00e77d4a520 100644 --- a/velox/common/file/FileSystems.cpp +++ b/velox/common/file/FileSystems.cpp @@ -163,10 +163,14 @@ class LocalFileSystem : public FileSystem { return filePaths; } - void mkdir(std::string_view path, const DirectoryOptions& /*options*/) - override { + void mkdir(std::string_view path, const DirectoryOptions& options) override { std::error_code ec; - std::filesystem::create_directories(path, ec); + const bool created = std::filesystem::create_directories(path, ec); + + if (!created && options.failIfExists) { + VELOX_FAIL("Directory: {} already exists", path); + } + VELOX_CHECK_EQ( 0, ec.value(), diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index 189b0b6bf5935..fbe98fb4a4237 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -68,6 +68,12 @@ struct FileOptions { /// Defines directory options struct DirectoryOptions : FileOptions { + /// Whether to throw an error if the directory already exists. + /// For POSIX systems, this is equivalent to handling EEXIST. + /// + /// NOTE: This is only applicable for mkdir + bool failIfExists{false}; + /// This is similar to kFileCreateConfig static constexpr folly::StringPiece kMakeDirectoryConfig{ "make-directory-config"}; diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index 7f21f5c30a3c8..a694afdb5302a 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -18,19 +18,6 @@ namespace facebook::velox::tests::utils { -std::string FaultFileOperation::typeString(Type type) { - switch (type) { - case Type::kReadv: - return "READV"; - case Type::kRead: - return "READ"; - default: - VELOX_UNSUPPORTED( - "Unknown file operation type: {}", static_cast(type)); - break; - } -} - FaultyReadFile::FaultyReadFile( const std::string& path, std::shared_ptr delegatedFile, diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index 4b09850d1ee12..dd54c6101d27d 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -17,106 +17,10 @@ #pragma once #include "velox/common/file/File.h" +#include "velox/common/file/tests/FaultyFileSystemOperations.h" namespace facebook::velox::tests::utils { -/// Defines the per-file operation fault injection. -struct FaultFileOperation { - enum class Type { - /// Injects faults for file read operations. - kRead, - kReadv, - kAppend, - kWrite, - /// TODO: add to support fault injections for the other file operation - /// types. - }; - static std::string typeString(Type type); - - const Type type; - - /// The delegated file path. - const std::string path; - - /// Indicates to forward this operation to the delegated file or not. If not, - /// then the file fault injection hook must have processed the request. For - /// instance, if this is a file read injection, then the hook must have filled - /// the fake read data for data corruption tests. - bool delegate{true}; - - FaultFileOperation(Type _type, const std::string& _path) - : type(_type), path(_path) {} -}; - -FOLLY_ALWAYS_INLINE std::ostream& operator<<( - std::ostream& o, - const FaultFileOperation::Type& type) { - return o << FaultFileOperation::typeString(type); -} - -/// Fault injection parameters for file read API. -struct FaultFileReadOperation : FaultFileOperation { - const uint64_t offset; - const uint64_t length; - void* const buf; - - FaultFileReadOperation( - const std::string& _path, - uint64_t _offset, - uint64_t _length, - void* _buf) - : FaultFileOperation(FaultFileOperation::Type::kRead, _path), - offset(_offset), - length(_length), - buf(_buf) {} -}; - -/// Fault injection parameters for file readv API. -struct FaultFileReadvOperation : FaultFileOperation { - const uint64_t offset; - const std::vector>& buffers; - uint64_t readBytes{0}; - - FaultFileReadvOperation( - const std::string& _path, - uint64_t _offset, - const std::vector>& _buffers) - : FaultFileOperation(FaultFileOperation::Type::kReadv, _path), - offset(_offset), - buffers(_buffers) {} -}; - -/// Fault injection parameters for file append API. -struct FaultFileAppendOperation : FaultFileOperation { - std::string_view* data; - - FaultFileAppendOperation( - const std::string& _path, - const std::string_view& _data) - : FaultFileOperation(FaultFileOperation::Type::kAppend, _path), - data(const_cast(&_data)) {} -}; - -/// Fault injection parameters for file write API. -struct FaultFileWriteOperation : FaultFileOperation { - const std::vector& iovecs; - int64_t offset; - int64_t length; - - FaultFileWriteOperation( - const std::string& _path, - const std::vector& _iovecs, - int64_t _offset, - int64_t _length) - : FaultFileOperation(FaultFileOperation::Type::kWrite, _path), - iovecs(_iovecs), - offset(_offset), - length(_length) {} -}; - -/// The fault injection hook on the file operation path. -using FileFaultInjectionHook = std::function; - class FaultyReadFile : public ReadFile { public: FaultyReadFile( diff --git a/velox/common/file/tests/FaultyFileSystem.cpp b/velox/common/file/tests/FaultyFileSystem.cpp index 6e346af61731c..bf1e68144e13b 100644 --- a/velox/common/file/tests/FaultyFileSystem.cpp +++ b/velox/common/file/tests/FaultyFileSystem.cpp @@ -116,7 +116,16 @@ void FaultyFileSystem::mkdir( std::string_view path, const DirectoryOptions& options) { const auto delegatedDirPath = extractPath(path); - getFileSystem(delegatedDirPath, config_)->mkdir(delegatedDirPath, options); + auto delegatedFs = getFileSystem(delegatedDirPath, config_); + + auto op = + FaultFileSystemMkdirOperation(std::string(delegatedDirPath), options); + maybeInjectFilesystemFault(&op); + if (!op.delegate) { + return; + } + + delegatedFs->mkdir(delegatedDirPath, options); } void FaultyFileSystem::rmdir(std::string_view path) { @@ -124,6 +133,20 @@ void FaultyFileSystem::rmdir(std::string_view path) { getFileSystem(delegatedDirPath, config_)->rmdir(delegatedDirPath); } +void FaultyFileSystem::setFilesystemInjectionHook( + FileSystemFaultInjectionHook hook) { + std::lock_guard l(mu_); + fsInjections_ = FileSystemInjections(std::move(hook)); +} + +void FaultyFileSystem::setFileSystemInjectionError( + std::exception_ptr exception, + std::unordered_set opTypes) { + std::lock_guard l(mu_); + fsInjections_ = + FileSystemInjections(std::move(exception), std::move(opTypes)); +} + void FaultyFileSystem::setFileInjectionHook( FileFaultInjectionHook injectionHook) { std::lock_guard l(mu_); @@ -144,11 +167,42 @@ void FaultyFileSystem::setFileInjectionDelay( fileInjections_ = FileInjections(delayUs, std::move(opTypes)); } +void FaultyFileSystem::clearFileSystemInjections() { + std::lock_guard l(mu_); + fsInjections_.reset(); +} + void FaultyFileSystem::clearFileFaultInjections() { std::lock_guard l(mu_); fileInjections_.reset(); } +void FaultyFileSystem::maybeInjectFilesystemFault( + FaultFileSystemOperation* op) { + { + std::lock_guard l(mu_); + if (!fsInjections_.has_value()) { + return; + } + + auto& injections = fsInjections_.value(); + if (injections.filesystemInjectionHook != nullptr) { + injections.filesystemInjectionHook(op); + return; + } + + if (!injections.opTypes.empty() && + injections.opTypes.count(op->type) == 0) { + return; + } + + if (injections.directoryException != nullptr) { + std::rethrow_exception(injections.directoryException); + } + } + return; +} + void FaultyFileSystem::maybeInjectFileFault(FaultFileOperation* op) { FileInjections injections; { diff --git a/velox/common/file/tests/FaultyFileSystem.h b/velox/common/file/tests/FaultyFileSystem.h index 72d76bc385388..b4ecf64f0c6fd 100644 --- a/velox/common/file/tests/FaultyFileSystem.h +++ b/velox/common/file/tests/FaultyFileSystem.h @@ -81,6 +81,15 @@ class FaultyFileSystem : public FileSystem { executor_ = executor; } + /// Sets the hook for filesystem fault injection. + void setFilesystemInjectionHook(FileSystemFaultInjectionHook hook); + + /// Setups to inject 'error' for a particular set of filesystem operation + /// types. Only operations inside 'opTypes' will be injected with 'error'. + void setFileSystemInjectionError( + std::exception_ptr error, + std::unordered_set opTypes = {}); + /// Setups hook for file fault injection. void setFileInjectionHook(FileFaultInjectionHook hook); @@ -101,7 +110,33 @@ class FaultyFileSystem : public FileSystem { /// Clears the file fault injections. void clearFileFaultInjections(); + /// Clears the filesystem fault injections. + void clearFileSystemInjections(); + private: + // Defines the per filesystem fault injection setup. Only one type of can be + // set at a time + struct FileSystemInjections { + // TODO: Support more flavors of fault injection + FileSystemFaultInjectionHook filesystemInjectionHook{nullptr}; + + std::exception_ptr directoryException{nullptr}; + + std::unordered_set opTypes{}; + + FileSystemInjections() = default; + + explicit FileSystemInjections( + std::exception_ptr exception, + std::unordered_set _opTypes) + : directoryException(std::move(exception)), + opTypes(std::move(_opTypes)) {} + + explicit FileSystemInjections( + FileSystemFaultInjectionHook _filesystemInjectionHook) + : filesystemInjectionHook(std::move(_filesystemInjectionHook)) {} + }; + // Defines the file injection setup and only one type of injection can be set // at a time. struct FileInjections { @@ -130,11 +165,15 @@ class FaultyFileSystem : public FileSystem { opTypes(std::move(_opTypes)) {} }; + // Invoked to inject filesystem fault to 'op' if configured. + void maybeInjectFilesystemFault(FaultFileSystemOperation* op); + // Invoked to inject file fault to 'op' if configured. void maybeInjectFileFault(FaultFileOperation* op); mutable std::mutex mu_; std::optional fileInjections_; + std::optional fsInjections_; folly::Executor* executor_; }; diff --git a/velox/common/file/tests/FaultyFileSystemOperations.h b/velox/common/file/tests/FaultyFileSystemOperations.h new file mode 100644 index 0000000000000..2acaea22748d1 --- /dev/null +++ b/velox/common/file/tests/FaultyFileSystemOperations.h @@ -0,0 +1,174 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::tests::utils { +using namespace filesystems; + +/// Defines the base class for file and filesystem operations +struct BaseFaultOperation { + BaseFaultOperation(const std::string& _path) : path(_path) {} + /// The delegated file path. + const std::string path; + + /// Indicates to forward this operation to the delegated file or not. If not, + /// then the file fault injection hook must have processed the request. For + /// instance, if this is a file read injection, then the hook must have filled + /// the fake read data for data corruption tests. + bool delegate{true}; +}; + +/// Defines the per-file operation fault injection. +struct FaultFileOperation : public BaseFaultOperation { + enum class Type { + /// Injects faults for file read operations. + kRead, + kReadv, + kWrite, + kAppend, + /// TODO: add to support fault injections for the other file operation + /// types. + }; + static std::string typeString(FaultFileOperation::Type type) { + switch (type) { + case FaultFileOperation::Type::kReadv: + return "READV"; + case FaultFileOperation::Type::kRead: + return "READ"; + default: + VELOX_UNSUPPORTED( + "Unknown file operation type: {}", static_cast(type)); + break; + } + } + + FaultFileOperation(Type _type, const std::string& _path) + : BaseFaultOperation(_path), type(_type) {} + + const Type type; +}; + +/// Defines the per-filesystem operation fault injection. +struct FaultFileSystemOperation : public BaseFaultOperation { + enum class Type { + kMkdir = 0, + /// TODO: Add support for fileSystem operations. + }; + FaultFileSystemOperation(Type _type, const std::string& _path) + : BaseFaultOperation(_path), type(_type) {} + + static std::string typeString(FaultFileSystemOperation::Type type) { + switch (type) { + case FaultFileSystemOperation::Type::kMkdir: + return "mkdir"; + default: + VELOX_UNSUPPORTED( + "Unknown filesystem operation type: {}", static_cast(type)); + } + } + + const Type type; +}; + +/// The fault injection hook on the file operation path. +using FileFaultInjectionHook = std::function; + +/// The fault injection hook on the file operation path. +using FileSystemFaultInjectionHook = + std::function; + +FOLLY_ALWAYS_INLINE std::ostream& operator<<( + std::ostream& o, + const FaultFileOperation::Type& type) { + return o << FaultFileOperation::typeString(type); +} + +/// Fault injection parameters for file read API. +struct FaultFileReadOperation : FaultFileOperation { + const uint64_t offset; + const uint64_t length; + void* const buf; + + FaultFileReadOperation( + const std::string& _path, + uint64_t _offset, + uint64_t _length, + void* _buf) + : FaultFileOperation(FaultFileOperation::Type::kRead, _path), + offset(_offset), + length(_length), + buf(_buf) {} +}; + +/// Fault injection parameters for file readv API. +struct FaultFileReadvOperation : FaultFileOperation { + const uint64_t offset; + const std::vector>& buffers; + uint64_t readBytes{0}; + + FaultFileReadvOperation( + const std::string& _path, + uint64_t _offset, + const std::vector>& _buffers) + : FaultFileOperation(FaultFileOperation::Type::kReadv, _path), + offset(_offset), + buffers(_buffers) {} +}; + +/// Fault injection parameters for file append API. +struct FaultFileAppendOperation : FaultFileOperation { + std::string_view* data; + + FaultFileAppendOperation( + const std::string& _path, + const std::string_view& _data) + : FaultFileOperation(FaultFileOperation::Type::kAppend, _path), + data(const_cast(&_data)) {} +}; + +/// Fault injection parameters for file write API. +struct FaultFileWriteOperation : FaultFileOperation { + const std::vector& iovecs; + int64_t offset; + int64_t length; + + FaultFileWriteOperation( + const std::string& _path, + const std::vector& _iovecs, + int64_t _offset, + int64_t _length) + : FaultFileOperation(FaultFileOperation::Type::kWrite, _path), + iovecs(_iovecs), + offset(_offset), + length(_length) {} +}; + +/// Fault injection parameters for file system mkdir API. +struct FaultFileSystemMkdirOperation : FaultFileSystemOperation { + DirectoryOptions options; + + FaultFileSystemMkdirOperation( + const std::string& _path, + const DirectoryOptions& _options) + : FaultFileSystemOperation(Type::kMkdir, _path), options(_options) {} +}; +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 5706a3a527175..66c424e8bb204 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -309,6 +309,30 @@ TEST_P(LocalFileTest, readFileDestructor) { } } +TEST_P(LocalFileTest, mkdirFailIfPresent) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); + std::string path = tempFolder->getPath(); + auto localFs = filesystems::getFileSystem(path, nullptr); + + path += "/level1/level2/level3"; + EXPECT_FALSE(localFs->exists(path)); + EXPECT_NO_THROW(localFs->mkdir(path)); + EXPECT_TRUE(localFs->exists(path)); + + // Except that if we try to make the directory again, + // it will not fail. + EXPECT_NO_THROW(localFs->mkdir(path)); + + // We fail if the directory is already present + VELOX_ASSERT_THROW( + localFs->mkdir( + path, + { + .failIfExists = true, + }), + fmt::format("Directory: {} already exists", localFs->extractPath(path))); +} + TEST_P(LocalFileTest, mkdir) { auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 9f2b8adfa7a97..6fc28f822f8b4 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -112,7 +112,7 @@ std::optional DriverCtx::makeSpillConfig( if (!queryConfig.spillEnabled()) { return std::nullopt; } - if (task->spillDirectory().empty()) { + if (task->spillDirectory().empty() && !task->hasCreateSpillDirectoryCb()) { return std::nullopt; } common::GetSpillDirectoryPathCB getSpillDirPathCb = diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 0bce65f91d14e..80056beeefa2c 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -429,7 +429,9 @@ bool Task::allNodesReceivedNoMoreSplitsMessageLocked() const { } const std::string& Task::getOrCreateSpillDirectory() { - VELOX_CHECK(!spillDirectory_.empty(), "Spill directory not set"); + VELOX_CHECK( + !spillDirectory_.empty() || spillDirectoryCallback_, + "Spill directory or spill directory callback must be set "); if (spillDirectoryCreated_) { return spillDirectory_; } @@ -438,7 +440,16 @@ const std::string& Task::getOrCreateSpillDirectory() { if (spillDirectoryCreated_) { return spillDirectory_; } + try { + // If callback is provided, we shall execute the callback instead + // of calling mkdir on the directory. + if (spillDirectoryCallback_) { + spillDirectory_ = spillDirectoryCallback_(); + spillDirectoryCreated_ = true; + return spillDirectory_; + } + auto fileSystem = filesystems::getFileSystem(spillDirectory_, nullptr); fileSystem->mkdir(spillDirectory_); } catch (const std::exception& e) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 7f421952ce51c..8bad8339b6691 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -103,6 +103,12 @@ class Task : public std::enable_shared_from_this { spillDirectoryCreated_ = alreadyCreated; } + void setCreateSpillDirectoryCb( + std::function spillDirectoryCallback) { + VELOX_CHECK_NULL(spillDirectoryCallback_); + spillDirectoryCallback_ = std::move(spillDirectoryCallback); + } + std::string toString() const; folly::dynamic toJson() const; @@ -644,6 +650,13 @@ class Task : public std::enable_shared_from_this { return spillDirectory_; } + bool hasCreateSpillDirectoryCb() const { + if (spillDirectoryCallback_) { + return true; + } + return false; + } + /// Returns the spill directory path. Ensures that the spill directory is /// created before returning. Is thread safe. Returns an empty string if /// either the spill directory is not specified during task creation or the @@ -1168,6 +1181,10 @@ class Task : public std::enable_shared_from_this { std::vector resumePromises_; // Base spill directory for this task. std::string spillDirectory_; + // Spill directory callback for this task. This callback will be used to + // create the spill directory for this task. This callback returns + // a path that will be into spillDirectory_ + std::function spillDirectoryCallback_; // Mutex to ensure only the first caller thread of 'getOrCreateSpillDirectory' // creates the directory. diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 474c1e4ca5032..919332b57d30e 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -17,6 +17,7 @@ #include "velox/exec/Task.h" #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/future/VeloxPromise.h" #include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/memory/SharedArbitrator.h" @@ -1701,6 +1702,51 @@ TEST_F(TaskTest, driverCreationMemoryAllocationCheck) { } } +TEST_F(TaskTest, spillDirectoryLifecycleManagementWithCallback) { + // Marks the spill directory as not already created and ensures that the Task + // handles creating it on first use and eventually deleting it on destruction. + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row % 300; }), + makeFlatVector(1'000, [](auto row) { return row; }), + }); + core::PlanNodeId aggrNodeId; + const auto plan = PlanBuilder() + .values({data}) + .singleAggregation({"c0"}, {"sum(c1)"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode(); + CursorParameters params; + params.planNode = plan; + params.queryCtx = core::QueryCtx::create(driverExecutor_.get()); + params.queryCtx->testingOverrideConfigUnsafe( + {{core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kAggregationSpillEnabled, "true"}}); + params.maxDrivers = 1; + + auto cursor = TaskCursor::create(params); + + std::shared_ptr task = cursor->task(); + auto rootTempDir = exec::test::TempDirectoryPath::create(); + auto tmpDirectoryPath = + rootTempDir->getPath() + "/spillDirectoryLifecycleManagement"; + task->setCreateSpillDirectoryCb([tmpDirectoryPath]() { + auto fs = filesystems::getFileSystem(tmpDirectoryPath, nullptr); + fs->mkdir(tmpDirectoryPath); + return tmpDirectoryPath; + }); + + TestScopedSpillInjection scopedSpillInjection(100); + while (cursor->moveNext()) { + } + ASSERT_TRUE(waitForTaskCompletion(task.get(), 5'000'000)); + EXPECT_EQ(exec::TaskState::kFinished, task->state()); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(aggrNodeId); + ASSERT_GT(stats.spilledRows, 0); + cursor.reset(); // ensure 'task' has no other shared pointer. + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); +} + TEST_F(TaskTest, spillDirectoryLifecycleManagement) { // Marks the spill directory as not already created and ensures that the Task // handles creating it on first use and eventually deleting it on destruction. @@ -1798,6 +1844,73 @@ TEST_F(TaskTest, spillDirNotCreated) { ASSERT_FALSE(fs->exists(tmpDirectoryPath)); } +TEST_F(TaskTest, setCreateSpillDirectoryCbTest) { + auto rootTmpDir = exec::test::TempDirectoryPath::create(); + const std::string mockDirectory1 = fmt::format( + "{}{}", tests::utils::FaultyFileSystem::scheme(), rootTmpDir->getPath()); + + const std::string mockDirectory2 = fmt::format("{}/test_dir", mockDirectory1); + auto fs = std::dynamic_pointer_cast( + filesystems::getFileSystem(mockDirectory1, nullptr)); + + auto plan = PlanBuilder() + .tableScan(ROW({"c0"}, {BIGINT()})) + .project({"c0 % 10"}) + .partitionedOutput({}, 1, std::vector{"p0"}) + .planFragment(); + auto task = Task::create( + "single.execution.task.0", + plan, + 0, + core::QueryCtx::create(), + Task::ExecutionMode::kSerial); + EXPECT_FALSE(task->hasCreateSpillDirectoryCb()); + + task->setCreateSpillDirectoryCb([mockDirectory1, mockDirectory2]() { + auto fs = filesystems::getFileSystem(mockDirectory1, nullptr); + filesystems::DirectoryOptions options; + options.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + "dummy.config=123"); + fs->mkdir(mockDirectory1, options); + fs->mkdir(mockDirectory2); + return mockDirectory2; + }); + + EXPECT_TRUE(task->hasCreateSpillDirectoryCb()); + + fs->setFileSystemInjectionError( + std::make_exception_ptr(std::runtime_error("test exception")), + {tests::utils::FaultFileSystemOperation::Type::kMkdir}); + // Test exception case + VELOX_ASSERT_THROW(task->getOrCreateSpillDirectory(), "test exception"); + fs->clearFileSystemInjections(); + + // Test success case + bool parentDirectoryCreated = false; + bool spillDirectoryCreated = false; + tests::utils::FileSystemFaultInjectionHook hook = [&](auto* op) { + auto mkdirOp = + static_cast(op); + if (mkdirOp->path == rootTmpDir->getPath()) { + parentDirectoryCreated = true; + auto it = mkdirOp->options.values.find( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()); + EXPECT_TRUE(it != mkdirOp->options.values.end()); + EXPECT_EQ(it->second, "dummy.config=123"); + } + if (mkdirOp->path == fmt::format("{}/test_dir", rootTmpDir->getPath())) { + spillDirectoryCreated = true; + } + return; + }; + + fs->setFilesystemInjectionHook(hook); + EXPECT_EQ(task->getOrCreateSpillDirectory(), mockDirectory2); + EXPECT_TRUE(parentDirectoryCreated); + EXPECT_TRUE(spillDirectoryCreated); +} + DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { auto probeVector = makeRowVector( {"t_c0"}, {makeFlatVector(10, [](auto row) { return row; })});