Skip to content

Commit

Permalink
feature(custom options to spill directories): Add ability to make par…
Browse files Browse the repository at this point in the history
…ent spill directory with custom options

Summary:
Add ability to create the parent spill direction with custom options.

We also add the ability for mkdir to decide whether or not to fail if a directory was created

Differential Revision: D65964247
  • Loading branch information
yuandagits authored and facebook-github-bot committed Nov 18, 2024
1 parent 3eb2fa4 commit 386f2cb
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 2 deletions.
9 changes: 7 additions & 2 deletions velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/common/base/Exceptions.h"
#include "velox/common/file/File.h"

#include <errno.h>
#include <cstdio>
#include <filesystem>

Expand Down Expand Up @@ -163,10 +164,14 @@ class LocalFileSystem : public FileSystem {
return filePaths;
}

void mkdir(std::string_view path, const DirectoryOptions& /*options*/)
override {
void mkdir(std::string_view path, const DirectoryOptions& options) override {
std::error_code ec;
std::filesystem::create_directories(path, ec);

if (ec.value() == EEXIST && !options.failIfExists) {
return;
}

VELOX_CHECK_EQ(
0,
ec.value(),
Expand Down
6 changes: 6 additions & 0 deletions velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ struct FileOptions {

/// Defines directory options
struct DirectoryOptions : FileOptions {
/// Whether to throw an error if the directory already exists.
/// For POSIX systems, this is equivalent to handling EEXIST.
///
/// NOTE: This is only applicable for mkdir
bool failIfExists{false};

/// This is similar to kFileCreateConfig
static constexpr folly::StringPiece kMakeDirectoryConfig{
"make-directory-config"};
Expand Down
10 changes: 10 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ class QueryConfig {
static constexpr const char* kSpillFileCreateConfig =
"spill_file_create_config";

/// Config used to create spill directory. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kSpillDirectoryCreateConfig =
"spill_directory_create_config";

/// Default offset spill start partition bit. It is used with
/// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to
/// calculate the spilling partition number for join spill or aggregation
Expand Down Expand Up @@ -659,6 +665,10 @@ class QueryConfig {
return get<std::string>(kSpillFileCreateConfig, "");
}

std::string spillDirectoryCreateConfig() const {
return get<std::string>(kSpillDirectoryCreateConfig, "");
}

int32_t minSpillableReservationPct() const {
constexpr int32_t kDefaultPct = 5;
return get<int32_t>(kMinSpillableReservationPct, kDefaultPct);
Expand Down
43 changes: 43 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,21 @@ const std::string& Task::getOrCreateSpillDirectory() {
}
try {
auto fileSystem = filesystems::getFileSystem(spillDirectory_, nullptr);

// If the spill directory config is set, first create the parent directory,
// and then the task directory so that the parent directory can have custom
// options like TTL applied to it.
const auto& directoryConfig =
queryCtx_->queryConfig().spillDirectoryCreateConfig();
if (!directoryConfig.empty()) {
filesystems::DirectoryOptions options;
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
directoryConfig);
fileSystem->mkdir(
Task::extractParentSpillDirectory(spillDirectory_), options);
}

fileSystem->mkdir(spillDirectory_);
} catch (const std::exception& e) {
VELOX_FAIL(
Expand Down Expand Up @@ -3090,4 +3105,32 @@ void Task::MemoryReclaimer::abort(
<< "Timeout waiting for task to complete during query memory aborting.";
}
}

/*static*/
std::string Task::extractParentSpillDirectory(
const std::string& spillDirectory) {
if (spillDirectory.empty() || spillDirectory.back() != '/') {
VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory);
}

// Find the position of the last '/'
const auto lastSlash = spillDirectory.find_last_of('/');

// Find the second-to-last '/'
const auto secondLastSlash = spillDirectory.find_last_of('/', lastSlash - 1);

// If there's no second '/' (meaning the path has only one directory), this is
// considered an invalid path
if (secondLastSlash == std::string::npos) {
VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory);
}

const auto parentSpillDirectory = spillDirectory.substr(0, secondLastSlash);
// If the parent directory is empty or the path ends in '/', this is
// considered an invalid path
if (parentSpillDirectory.empty() || parentSpillDirectory.back() == '/') {
VELOX_FAIL("Invalid spill directory provided: {}", spillDirectory);
}
return fmt::format("{}/", parentSpillDirectory);
}
} // namespace facebook::velox::exec
6 changes: 6 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,12 @@ class Task : public std::enable_shared_from_this<Task> {
// trace enabled.
void maybeInitTrace();

// Extracts the parent spill directory from the spill directory. This is used
// if we want to set custom options like TTL on the parent of the spill
// directory.
static std::string extractParentSpillDirectory(
const std::string& spillDirectory);

// Universally unique identifier of the task. Used to identify the task when
// calling TaskListener.
const std::string uuid_;
Expand Down
120 changes: 120 additions & 0 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/MockFilesystem.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
Expand Down Expand Up @@ -1798,6 +1799,125 @@ TEST_F(TaskTest, spillDirNotCreated) {
ASSERT_FALSE(fs->exists(tmpDirectoryPath));
}

MATCHER_P(SpillParentDirectoryConfigMatcher, makeDirectoryConfig, "") {
return arg.values.contains(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()) &&
arg.values.at(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString()) ==
makeDirectoryConfig.values.at(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString());
}
TEST_F(TaskTest, getOrCreateSpillDirectoryTest) {
// Register the mockFileSystem for testing purposes.
MockFilesystem::registerMockFileSystem();
auto mockFilesystem = std::dynamic_pointer_cast<MockFilesystem>(
filesystems::getFileSystem("mock://", nullptr));

// This is scoped so that the destruction of Task can be called
// when the scope ends.
{
auto plan = PlanBuilder()
.tableScan(ROW({"c0"}, {BIGINT()}))
.project({"c0 % 10"})
.partitionedOutput({}, 1, std::vector<std::string>{"p0"})
.planFragment();
auto task = Task::create(
"single.execution.task.0",
plan,
0,
core::QueryCtx::create(),
Task::ExecutionMode::kSerial);
constexpr auto mockSpillDirectory =
"mock://dir1/dir2/mock_parent/mock_spill_directory/";
constexpr auto mockParentDirectory = "mock://dir1/dir2/mock_parent/";
task->setSpillDirectory(mockSpillDirectory, false);

// Give the parent spill directory a fake config for testing purposes.
constexpr auto fakeDirectoryConfig = "mock_fs.directory_ttl_ms=259200000";
task->queryCtx()->testingOverrideConfigUnsafe(
{{core::QueryConfig::kSpillDirectoryCreateConfig,
fakeDirectoryConfig}});

// Check if the spill directory is created and validate directory config.
filesystems::DirectoryOptions expectedOptions;
expectedOptions.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
fakeDirectoryConfig);
// Check that the config passed into mkdir matches
EXPECT_CALL(
*mockFilesystem,
mkdir(
mockParentDirectory,
SpillParentDirectoryConfigMatcher(expectedOptions)))
.Times(1);

// Check that options without any values is called
// on the spill directory itself
filesystems::DirectoryOptions spillDirectoryOptions;
EXPECT_CALL(*mockFilesystem, mkdir(mockSpillDirectory, testing::_))
.Times(1);

// Called once when the task is destructed.
EXPECT_CALL(*mockFilesystem, rmdir(testing::_)).Times(1);
auto spillDirectory = task->getOrCreateSpillDirectory();
}

// Need to forcefully destruct this singleton so that the assertions
// on the mocks can be called.
auto fs = mockFilesystem.get();
fs->~MockFilesystem();
}

TEST_F(TaskTest, extractParentSpillDirectoryTest) {
// Register the mockFileSystem for testing purposes.
MockFilesystem::registerMockFileSystem();
auto mockFilesystem = std::dynamic_pointer_cast<MockFilesystem>(
filesystems::getFileSystem("mock://", nullptr));

// This is scoped so that the destruction of Task can be called
// when the scope ends.
{
auto plan = PlanBuilder()
.tableScan(ROW({"c0"}, {BIGINT()}))
.project({"c0 % 10"})
.partitionedOutput({}, 1, std::vector<std::string>{"p0"})
.planFragment();
auto task = Task::create(
"single.execution.task.0",
plan,
0,
core::QueryCtx::create(),
Task::ExecutionMode::kSerial);

// Give the parent spill directory a fake config for testing purposes.
constexpr auto fakeDirectoryConfig = "mock_fs.directory_ttl_ms=259200000";
task->queryCtx()->testingOverrideConfigUnsafe(
{{core::QueryConfig::kSpillDirectoryCreateConfig,
fakeDirectoryConfig}});

// Path does not end in '/'
auto mockSpillDirectory = "mock://a";
task->setSpillDirectory(mockSpillDirectory, false);
VELOX_ASSERT_THROW(
task->getOrCreateSpillDirectory(),
fmt::format(
"Invalid spill directory provided: {}", mockSpillDirectory));

// Path has no true parent directory and is an empty directory
mockSpillDirectory = "mock://a/";
task->setSpillDirectory(mockSpillDirectory, false);
VELOX_ASSERT_THROW(
task->getOrCreateSpillDirectory(),
fmt::format(
"Invalid spill directory provided: {}", mockSpillDirectory));
}

// Need to forcefully destruct this singleton so that the assertions
// on the mocks can be called.
auto fs = mockFilesystem.get();
fs->~MockFilesystem();
}

DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) {
auto probeVector = makeRowVector(
{"t_c0"}, {makeFlatVector<int32_t>(10, [](auto row) { return row; })});
Expand Down
92 changes: 92 additions & 0 deletions velox/exec/tests/utils/MockFilesystem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <gmock/gmock.h>

#include "velox/common/file/FileSystems.h"

namespace facebook::velox::exec::test {

class MockFilesystem : public filesystems::FileSystem {
public:
explicit MockFilesystem() : filesystems::FileSystem(nullptr){};

~MockFilesystem() override = default;

MOCK_METHOD(std::string, name, (), (const, override));

MOCK_METHOD(std::string_view, extractPath, (std::string_view), (override));

MOCK_METHOD(
std::unique_ptr<ReadFile>,
openFileForRead,
(std::string_view, const filesystems::FileOptions&),
(override));

MOCK_METHOD(
std::unique_ptr<WriteFile>,
openFileForWrite,
(std::string_view, const filesystems::FileOptions&),
(override));

MOCK_METHOD(void, remove, (std::string_view), (override));

MOCK_METHOD(
void,
rename,
(std::string_view, std::string_view, bool),
(override));

MOCK_METHOD(bool, exists, (std::string_view), (override));

MOCK_METHOD(std::vector<std::string>, list, (std::string_view), (override));

MOCK_METHOD(
void,
mkdir,
(std::string_view, const filesystems::DirectoryOptions&),
(override));

MOCK_METHOD(void, rmdir, (std::string_view), (override));

static void registerMockFileSystem() {
static std::string_view kMockFileSystemPrefix = "mock://";
std::function<bool(std::string_view)> filenameMatcher =
[](std::string_view filename) {
return filename.find(kMockFileSystemPrefix) == 0;
};

std::function<std::shared_ptr<filesystems::FileSystem>(
std::shared_ptr<const config::ConfigBase>, std::string_view)>
fileSystemGenerator =
[](std::shared_ptr<const config::ConfigBase>,
std::string_view) -> std::shared_ptr<filesystems::FileSystem> {
static std::shared_ptr<filesystems::FileSystem> mockFilesystem;
static folly::once_flag fsInstantiationFlag;
folly::call_once(fsInstantiationFlag, [&]() {
mockFilesystem = std::make_shared<MockFilesystem>();
});
return mockFilesystem;
};

facebook::velox::filesystems::registerFileSystem(
std::move(filenameMatcher), fileSystemGenerator);
}
};

} // namespace facebook::velox::exec::test

0 comments on commit 386f2cb

Please sign in to comment.