From 386f2cbe2c163ef5f96c8136c0edcec2b9dd552a Mon Sep 17 00:00:00 2001 From: Yenda Li Date: Mon, 18 Nov 2024 11:25:04 -0800 Subject: [PATCH] feature(custom options to spill directories): Add ability to make parent spill directory with custom options Summary: Add ability to create the parent spill direction with custom options. We also add the ability for mkdir to decide whether or not to fail if a directory was created Differential Revision: D65964247 --- velox/common/file/FileSystems.cpp | 9 +- velox/common/file/FileSystems.h | 6 ++ velox/core/QueryConfig.h | 10 ++ velox/exec/Task.cpp | 43 +++++++++ velox/exec/Task.h | 6 ++ velox/exec/tests/TaskTest.cpp | 120 ++++++++++++++++++++++++ velox/exec/tests/utils/MockFilesystem.h | 92 ++++++++++++++++++ 7 files changed, 284 insertions(+), 2 deletions(-) create mode 100644 velox/exec/tests/utils/MockFilesystem.h diff --git a/velox/common/file/FileSystems.cpp b/velox/common/file/FileSystems.cpp index 4f215a7ce5f1d..f6dfc6f65327e 100644 --- a/velox/common/file/FileSystems.cpp +++ b/velox/common/file/FileSystems.cpp @@ -19,6 +19,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/file/File.h" +#include #include #include @@ -163,10 +164,14 @@ class LocalFileSystem : public FileSystem { return filePaths; } - void mkdir(std::string_view path, const DirectoryOptions& /*options*/) - override { + void mkdir(std::string_view path, const DirectoryOptions& options) override { std::error_code ec; std::filesystem::create_directories(path, ec); + + if (ec.value() == EEXIST && !options.failIfExists) { + return; + } + VELOX_CHECK_EQ( 0, ec.value(), diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index 189b0b6bf5935..fbe98fb4a4237 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -68,6 +68,12 @@ struct FileOptions { /// Defines directory options struct DirectoryOptions : FileOptions { + /// Whether to throw an error if the directory already exists. + /// For POSIX systems, this is equivalent to handling EEXIST. + /// + /// NOTE: This is only applicable for mkdir + bool failIfExists{false}; + /// This is similar to kFileCreateConfig static constexpr folly::StringPiece kMakeDirectoryConfig{ "make-directory-config"}; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 7041126e21535..1dbbbad627a09 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -257,6 +257,12 @@ class QueryConfig { static constexpr const char* kSpillFileCreateConfig = "spill_file_create_config"; + /// Config used to create spill directory. This config is provided to + /// underlying file system and the config is free form. The form should be + /// defined by the underlying file system. + static constexpr const char* kSpillDirectoryCreateConfig = + "spill_directory_create_config"; + /// Default offset spill start partition bit. It is used with /// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to /// calculate the spilling partition number for join spill or aggregation @@ -659,6 +665,10 @@ class QueryConfig { return get(kSpillFileCreateConfig, ""); } + std::string spillDirectoryCreateConfig() const { + return get(kSpillDirectoryCreateConfig, ""); + } + int32_t minSpillableReservationPct() const { constexpr int32_t kDefaultPct = 5; return get(kMinSpillableReservationPct, kDefaultPct); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 0bce65f91d14e..b25cc93c03466 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -440,6 +440,21 @@ const std::string& Task::getOrCreateSpillDirectory() { } try { auto fileSystem = filesystems::getFileSystem(spillDirectory_, nullptr); + + // If the spill directory config is set, first create the parent directory, + // and then the task directory so that the parent directory can have custom + // options like TTL applied to it. + const auto& directoryConfig = + queryCtx_->queryConfig().spillDirectoryCreateConfig(); + if (!directoryConfig.empty()) { + filesystems::DirectoryOptions options; + options.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + directoryConfig); + fileSystem->mkdir( + Task::extractParentSpillDirectory(spillDirectory_), options); + } + fileSystem->mkdir(spillDirectory_); } catch (const std::exception& e) { VELOX_FAIL( @@ -3090,4 +3105,32 @@ void Task::MemoryReclaimer::abort( << "Timeout waiting for task to complete during query memory aborting."; } } + +/*static*/ +std::string Task::extractParentSpillDirectory( + const std::string& spillDirectory) { + if (spillDirectory.empty() || spillDirectory.back() != '/') { + VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory); + } + + // Find the position of the last '/' + const auto lastSlash = spillDirectory.find_last_of('/'); + + // Find the second-to-last '/' + const auto secondLastSlash = spillDirectory.find_last_of('/', lastSlash - 1); + + // If there's no second '/' (meaning the path has only one directory), this is + // considered an invalid path + if (secondLastSlash == std::string::npos) { + VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory); + } + + const auto parentSpillDirectory = spillDirectory.substr(0, secondLastSlash); + // If the parent directory is empty or the path ends in '/', this is + // considered an invalid path + if (parentSpillDirectory.empty() || parentSpillDirectory.back() == '/') { + VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory); + } + return fmt::format("{}/", parentSpillDirectory); +} } // namespace facebook::velox::exec diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 7f421952ce51c..21bea76a3bf90 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -986,6 +986,12 @@ class Task : public std::enable_shared_from_this { // trace enabled. void maybeInitTrace(); + // Extracts the parent spill directory from the spill directory. This is used + // if we want to set custom options like TTL on the parent of the spill + // directory. + static std::string extractParentSpillDirectory( + const std::string& spillDirectory); + // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. const std::string uuid_; diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 474c1e4ca5032..fb12a64a26b78 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -29,6 +29,7 @@ #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/MockFilesystem.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -1798,6 +1799,125 @@ TEST_F(TaskTest, spillDirNotCreated) { ASSERT_FALSE(fs->exists(tmpDirectoryPath)); } +MATCHER_P(SpillParentDirectoryConfigMatcher, makeDirectoryConfig, "") { + return arg.values.contains( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()) && + arg.values.at( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()) == + makeDirectoryConfig.values.at( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()); +} +TEST_F(TaskTest, getOrCreateSpillDirectoryTest) { + // Register the mockFileSystem for testing purposes. + MockFilesystem::registerMockFileSystem(); + auto mockFilesystem = std::dynamic_pointer_cast( + filesystems::getFileSystem("mock://", nullptr)); + + // This is scoped so that the destruction of Task can be called + // when the scope ends. + { + auto plan = PlanBuilder() + .tableScan(ROW({"c0"}, {BIGINT()})) + .project({"c0 % 10"}) + .partitionedOutput({}, 1, std::vector{"p0"}) + .planFragment(); + auto task = Task::create( + "single.execution.task.0", + plan, + 0, + core::QueryCtx::create(), + Task::ExecutionMode::kSerial); + constexpr auto mockSpillDirectory = + "mock://dir1/dir2/mock_parent/mock_spill_directory/"; + constexpr auto mockParentDirectory = "mock://dir1/dir2/mock_parent/"; + task->setSpillDirectory(mockSpillDirectory, false); + + // Give the parent spill directory a fake config for testing purposes. + constexpr auto fakeDirectoryConfig = "mock_fs.directory_ttl_ms=259200000"; + task->queryCtx()->testingOverrideConfigUnsafe( + {{core::QueryConfig::kSpillDirectoryCreateConfig, + fakeDirectoryConfig}}); + + // Check if the spill directory is created and validate directory config. + filesystems::DirectoryOptions expectedOptions; + expectedOptions.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + fakeDirectoryConfig); + // Check that the config passed into mkdir matches + EXPECT_CALL( + *mockFilesystem, + mkdir( + mockParentDirectory, + SpillParentDirectoryConfigMatcher(expectedOptions))) + .Times(1); + + // Check that options without any values is called + // on the spill directory itself + filesystems::DirectoryOptions spillDirectoryOptions; + EXPECT_CALL(*mockFilesystem, mkdir(mockSpillDirectory, testing::_)) + .Times(1); + + // Called once when the task is destructed. + EXPECT_CALL(*mockFilesystem, rmdir(testing::_)).Times(1); + auto spillDirectory = task->getOrCreateSpillDirectory(); + } + + // Need to forcefully destruct this singleton so that the assertions + // on the mocks can be called. + auto fs = mockFilesystem.get(); + fs->~MockFilesystem(); +} + +TEST_F(TaskTest, extractParentSpillDirectoryTest) { + // Register the mockFileSystem for testing purposes. + MockFilesystem::registerMockFileSystem(); + auto mockFilesystem = std::dynamic_pointer_cast( + filesystems::getFileSystem("mock://", nullptr)); + + // This is scoped so that the destruction of Task can be called + // when the scope ends. + { + auto plan = PlanBuilder() + .tableScan(ROW({"c0"}, {BIGINT()})) + .project({"c0 % 10"}) + .partitionedOutput({}, 1, std::vector{"p0"}) + .planFragment(); + auto task = Task::create( + "single.execution.task.0", + plan, + 0, + core::QueryCtx::create(), + Task::ExecutionMode::kSerial); + + // Give the parent spill directory a fake config for testing purposes. + constexpr auto fakeDirectoryConfig = "mock_fs.directory_ttl_ms=259200000"; + task->queryCtx()->testingOverrideConfigUnsafe( + {{core::QueryConfig::kSpillDirectoryCreateConfig, + fakeDirectoryConfig}}); + + // Path does not end in '/' + auto mockSpillDirectory = "mock://a"; + task->setSpillDirectory(mockSpillDirectory, false); + VELOX_ASSERT_THROW( + task->getOrCreateSpillDirectory(), + fmt::format( + "Invalid spill directory provided: {}", mockSpillDirectory)); + + // Path has no true parent directory and is an empty directory + mockSpillDirectory = "mock://a/"; + task->setSpillDirectory(mockSpillDirectory, false); + VELOX_ASSERT_THROW( + task->getOrCreateSpillDirectory(), + fmt::format( + "Invalid spill directory provided: {}", mockSpillDirectory)); + } + + // Need to forcefully destruct this singleton so that the assertions + // on the mocks can be called. + auto fs = mockFilesystem.get(); + fs->~MockFilesystem(); +} + DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { auto probeVector = makeRowVector( {"t_c0"}, {makeFlatVector(10, [](auto row) { return row; })}); diff --git a/velox/exec/tests/utils/MockFilesystem.h b/velox/exec/tests/utils/MockFilesystem.h new file mode 100644 index 0000000000000..96909dc620e2d --- /dev/null +++ b/velox/exec/tests/utils/MockFilesystem.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::exec::test { + +class MockFilesystem : public filesystems::FileSystem { + public: + explicit MockFilesystem() : filesystems::FileSystem(nullptr){}; + + ~MockFilesystem() override = default; + + MOCK_METHOD(std::string, name, (), (const, override)); + + MOCK_METHOD(std::string_view, extractPath, (std::string_view), (override)); + + MOCK_METHOD( + std::unique_ptr, + openFileForRead, + (std::string_view, const filesystems::FileOptions&), + (override)); + + MOCK_METHOD( + std::unique_ptr, + openFileForWrite, + (std::string_view, const filesystems::FileOptions&), + (override)); + + MOCK_METHOD(void, remove, (std::string_view), (override)); + + MOCK_METHOD( + void, + rename, + (std::string_view, std::string_view, bool), + (override)); + + MOCK_METHOD(bool, exists, (std::string_view), (override)); + + MOCK_METHOD(std::vector, list, (std::string_view), (override)); + + MOCK_METHOD( + void, + mkdir, + (std::string_view, const filesystems::DirectoryOptions&), + (override)); + + MOCK_METHOD(void, rmdir, (std::string_view), (override)); + + static void registerMockFileSystem() { + static std::string_view kMockFileSystemPrefix = "mock://"; + std::function filenameMatcher = + [](std::string_view filename) { + return filename.find(kMockFileSystemPrefix) == 0; + }; + + std::function( + std::shared_ptr, std::string_view)> + fileSystemGenerator = + [](std::shared_ptr, + std::string_view) -> std::shared_ptr { + static std::shared_ptr mockFilesystem; + static folly::once_flag fsInstantiationFlag; + folly::call_once(fsInstantiationFlag, [&]() { + mockFilesystem = std::make_shared(); + }); + return mockFilesystem; + }; + + facebook::velox::filesystems::registerFileSystem( + std::move(filenameMatcher), fileSystemGenerator); + } +}; + +} // namespace facebook::velox::exec::test