diff --git a/velox/common/CMakeLists.txt b/velox/common/CMakeLists.txt index e345d7197620..e96b44b9d2c6 100644 --- a/velox/common/CMakeLists.txt +++ b/velox/common/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(base) add_subdirectory(caching) add_subdirectory(compression) +add_subdirectory(config) add_subdirectory(encode) add_subdirectory(file) add_subdirectory(hyperloglog) diff --git a/velox/common/config/CMakeLists.txt b/velox/common/config/CMakeLists.txt new file mode 100644 index 000000000000..7780665a2925 --- /dev/null +++ b/velox/common/config/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if (${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif () + +velox_add_library(velox_common_config Config.cpp) +velox_link_libraries( + velox_common_config + PUBLIC velox_common_base + velox_exception + PRIVATE re2::re2) diff --git a/velox/common/config/Config.cpp b/velox/common/config/Config.cpp new file mode 100644 index 000000000000..0582b1e755e4 --- /dev/null +++ b/velox/common/config/Config.cpp @@ -0,0 +1,150 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/config/Config.h" + +namespace facebook::velox::config { + +double toBytesPerCapacityUnit(CapacityUnit unit) { + switch (unit) { + case CapacityUnit::BYTE: + return 1; + case CapacityUnit::KILOBYTE: + return exp2(10); + case CapacityUnit::MEGABYTE: + return exp2(20); + case CapacityUnit::GIGABYTE: + return exp2(30); + case CapacityUnit::TERABYTE: + return exp2(40); + case CapacityUnit::PETABYTE: + return exp2(50); + default: + VELOX_USER_FAIL("Invalid capacity unit '{}'", (int)unit); + } +} + +CapacityUnit valueOfCapacityUnit(const std::string& unitStr) { + std::stringstream ss; + for (const char c : unitStr) { + ss << static_cast(std::tolower(c)); + } + const auto lowerUnitStr = ss.str(); + if (lowerUnitStr == "b") { + return CapacityUnit::BYTE; + } + if (lowerUnitStr == "kb") { + return CapacityUnit::KILOBYTE; + } + if (lowerUnitStr == "mb") { + return CapacityUnit::MEGABYTE; + } + if (lowerUnitStr == "gb") { + return CapacityUnit::GIGABYTE; + } + if (lowerUnitStr == "tb") { + return CapacityUnit::TERABYTE; + } + if (lowerUnitStr == "pb") { + return CapacityUnit::PETABYTE; + } + VELOX_USER_FAIL("Invalid capacity unit '{}'", unitStr); +} + +uint64_t toCapacity(const std::string& from, CapacityUnit to) { + static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$)"); + double value; + std::string unit; + if (!RE2::FullMatch(from, kPattern, &value, &unit)) { + VELOX_USER_FAIL("Invalid capacity string '{}'", from); + } + + return value * + (toBytesPerCapacityUnit(valueOfCapacityUnit(unit)) / + toBytesPerCapacityUnit(to)); +} + +std::chrono::duration toDuration(const std::string& str) { + static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*)"); + + double value; + std::string unit; + if (!RE2::FullMatch(str, kPattern, &value, &unit)) { + VELOX_USER_FAIL("Invalid duration '{}'", str); + } + if (unit == "ns") { + return std::chrono::duration(value); + } else if (unit == "us") { + return std::chrono::duration(value); + } else if (unit == "ms") { + return std::chrono::duration(value); + } else if (unit == "s") { + return std::chrono::duration(value); + } else if (unit == "m") { + return std::chrono::duration>(value); + } else if (unit == "h") { + return std::chrono::duration>(value); + } else if (unit == "d") { + return std::chrono::duration>(value); + } + VELOX_USER_FAIL("Invalid duration '{}'", str); +} + +ConfigBase& ConfigBase::set(const std::string& key, const std::string& val) { + VELOX_CHECK(mutable_, "Cannot set in immutable config"); + std::unique_lock l(mutex_); + configs_[key] = val; + return *this; +} + +ConfigBase& ConfigBase::reset() { + VELOX_CHECK(mutable_, "Cannot reset in immutable config"); + std::unique_lock l(mutex_); + configs_.clear(); + return *this; +} + +bool ConfigBase::valueExists(const std::string& key) const { + std::shared_lock l(mutex_); + return configs_.find(key) != configs_.end(); +}; + +const std::unordered_map& ConfigBase::rawConfigs() + const { + VELOX_CHECK( + !mutable_, + "Mutable config cannot return unprotected reference to raw configs."); + return configs_; +} + +std::unordered_map ConfigBase::rawConfigsCopy() + const { + std::shared_lock l(mutex_); + return configs_; +} + +folly::Optional ConfigBase::get(const std::string& key) const { + folly::Optional val; + std::shared_lock l(mutex_); + auto it = configs_.find(key); + if (it != configs_.end()) { + val = it->second; + } + return val; +} +} // namespace facebook::velox::config diff --git a/velox/common/config/Config.h b/velox/common/config/Config.h index dd5f37c55b73..793c808a6e0b 100644 --- a/velox/common/config/Config.h +++ b/velox/common/config/Config.h @@ -23,79 +23,136 @@ #include "folly/Conv.h" #include "velox/common/base/Exceptions.h" -namespace facebook::velox::common { -// The concrete config class would inherit the config base -// and then just define all the entries. -template +namespace facebook::velox::config { + +enum class CapacityUnit { + BYTE, + KILOBYTE, + MEGABYTE, + GIGABYTE, + TERABYTE, + PETABYTE +}; + +double toBytesPerCapacityUnit(CapacityUnit unit); + +CapacityUnit valueOfCapacityUnit(const std::string& unitStr); + +/// Convert capacity string with unit to the capacity number in the specified +/// units +uint64_t toCapacity(const std::string& from, CapacityUnit to); + +std::chrono::duration toDuration(const std::string& str); + +/// The concrete config class should inherit the config base and define all the +/// entries. class ConfigBase { public: template - class Entry { - private: + struct Entry { Entry( - const std::string& key, - const T& val, - std::function toStr = + const std::string& _key, + const T& _val, + std::function _toStr = [](const T& val) { return folly::to(val); }, - std::function toT = - [](const std::string& key, const std::string& val) { - auto converted = folly::tryTo(val); + std::function _toT = + [](const std::string& k, const std::string& v) { + auto converted = folly::tryTo(v); VELOX_CHECK( converted.hasValue(), fmt::format( "Invalid configuration for key '{}'. Value '{}' cannot be converted to type {}.", - key, - val, + k, + v, folly::demangle(typeid(T)))); return converted.value(); }) - : key_{key}, default_{val}, toStr_{toStr}, toT_{toT} {} + : key{_key}, defaultVal{_val}, toStr{_toStr}, toT{_toT} {} - public: - const std::string& configKey() const { - return key_; - } - - private: - const std::string key_; - const T default_; - const std::function toStr_; - const std::function toT_; - - friend ConfigBase; - friend ConcreteConfig; + const std::string key; + const T defaultVal; + const std::function toStr; + const std::function toT; }; + ConfigBase( + std::unordered_map&& configs, + bool _mutable = false) + : configs_(std::move(configs)), mutable_(_mutable) {} + template ConfigBase& set(const Entry& entry, const T& val) { - configs_[entry.key_] = entry.toStr_(val); + VELOX_CHECK(mutable_, "Cannot set in immutable config"); + std::unique_lock l(mutex_); + configs_[entry.key] = entry.toStr(val); return *this; } + ConfigBase& set(const std::string& key, const std::string& val); + template ConfigBase& unset(const Entry& entry) { - configs_.erase(entry.key_); + VELOX_CHECK(mutable_, "Cannot unset in immutable config"); + std::unique_lock l(mutex_); + configs_.erase(entry.key); return *this; } - ConfigBase& reset() { - configs_.clear(); - return *this; - } + ConfigBase& reset(); template T get(const Entry& entry) const { - auto iter = configs_.find(entry.key_); - return iter != configs_.end() ? entry.toT_(entry.key_, iter->second) - : entry.default_; + std::shared_lock l(mutex_); + auto iter = configs_.find(entry.key); + return iter != configs_.end() ? entry.toT(entry.key, iter->second) + : entry.defaultVal; + } + + template + folly::Optional get( + const std::string& key, + std::function toT = [](auto /* unused */, + auto value) { + return folly::to(value); + }) const { + auto val = get(key); + if (val.hasValue()) { + return toT(key, val.value()); + } else { + return folly::none; + } } - std::map toSerdeParams() { - return std::map{configs_.cbegin(), configs_.cend()}; + template + T get( + const std::string& key, + const T& defaultValue, + std::function toT = [](auto /* unused */, + auto value) { + return folly::to(value); + }) const { + auto val = get(key); + if (val.hasValue()) { + return toT(key, val.value()); + } else { + return defaultValue; + } } + bool valueExists(const std::string& key) const; + + const std::unordered_map& rawConfigs() const; + + std::unordered_map rawConfigsCopy() const; + protected: + mutable std::shared_mutex mutex_; std::unordered_map configs_; + + private: + folly::Optional get(const std::string& key) const; + + const bool mutable_; }; -} // namespace facebook::velox::common +} // namespace facebook::velox::config diff --git a/velox/common/config/tests/CMakeLists.txt b/velox/common/config/tests/CMakeLists.txt new file mode 100644 index 000000000000..c1a719a0f504 --- /dev/null +++ b/velox/common/config/tests/CMakeLists.txt @@ -0,0 +1,20 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_config_test ConfigTest.cpp) +add_test(velox_config_test velox_config_test) +target_link_libraries( + velox_config_test + PUBLIC Folly::folly + PRIVATE velox_common_config gtest gtest_main) diff --git a/velox/common/config/tests/ConfigTest.cpp b/velox/common/config/tests/ConfigTest.cpp new file mode 100644 index 000000000000..b97e2df3528b --- /dev/null +++ b/velox/common/config/tests/ConfigTest.cpp @@ -0,0 +1,268 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/config/Config.h" + +namespace facebook::velox::config { + +class ConfigTest : public ::testing::Test {}; + +enum TestEnum { ENUM_0 = 0, ENUM_1 = 1, ENUM_2 = 2, UNKNOWN = 3 }; + +class TestConfig : public ConfigBase { + public: + template + using Entry = ConfigBase::Entry; + + static Entry kInt32Entry; + static Entry kUint64Entry; + static Entry kBoolEntry; + static Entry kStringEntry; + static Entry kEnumEntry; + + TestConfig( + std::unordered_map&& configs, + bool _mutable) + : ConfigBase(std::move(configs), _mutable) {} +}; + +// Definition needs to be outside of class +TestConfig::Entry TestConfig::kInt32Entry("int32_entry", -32); +TestConfig::Entry TestConfig::kUint64Entry("uint64_entry", 64); +TestConfig::Entry TestConfig::kBoolEntry("bool_entry", true); +TestConfig::Entry TestConfig::kStringEntry( + "string_entry", + "default.string.value"); +TestConfig::Entry TestConfig::kEnumEntry( + "enum_entry", + TestEnum::ENUM_0, + [](const TestEnum& value) { + if (value == TestEnum::ENUM_0) { + return "ENUM_0"; + } + if (value == TestEnum::ENUM_1) { + return "ENUM_1"; + } + if (value == TestEnum::ENUM_2) { + return "ENUM_2"; + } + return "UNKNOWN"; + }, + [](const std::string& /* unused */, const std::string& v) { + if (v == "ENUM_0") { + return TestEnum::ENUM_0; + } + if (v == "ENUM_1") { + return TestEnum::ENUM_1; + } + if (v == "ENUM_2") { + return TestEnum::ENUM_2; + } + return TestEnum::UNKNOWN; + }); + +TEST_F(ConfigTest, creation) { + { + std::unordered_map rawConfigs{}; + auto config = std::make_shared(std::move(rawConfigs), false); + ASSERT_EQ(config->rawConfigs().size(), 0); + ASSERT_EQ(config->rawConfigsCopy().size(), 0); + } + + { + std::unordered_map rawConfigs{}; + rawConfigs.emplace("int32_entry", "-3200"); + auto config = std::make_shared(std::move(rawConfigs), true); + ASSERT_EQ(config->rawConfigsCopy().size(), 1); + VELOX_ASSERT_THROW( + config->rawConfigs(), + "Mutable config cannot return unprotected reference to raw configs."); + } +} + +TEST_F(ConfigTest, immutableConfig) { + // Testing default values + auto config = std::make_shared( + std::unordered_map(), false); + ASSERT_EQ(config->get(TestConfig::kInt32Entry), -32); + ASSERT_EQ(config->get(TestConfig::kUint64Entry), 64); + ASSERT_EQ(config->get(TestConfig::kBoolEntry), true); + ASSERT_EQ(config->get(TestConfig::kStringEntry), "default.string.value"); + ASSERT_EQ(config->get(TestConfig::kEnumEntry), TestEnum::ENUM_0); + + std::unordered_map rawConfigs{ + {TestConfig::kInt32Entry.key, "-3200"}, + {TestConfig::kUint64Entry.key, "6400"}, + {TestConfig::kStringEntry.key, "not.default.string.value"}, + {TestConfig::kBoolEntry.key, "false"}, + {TestConfig::kEnumEntry.key, "ENUM_2"}, + }; + + auto expectedRawConfigs = rawConfigs; + + config = std::make_shared(std::move(rawConfigs), false); + + // Testing behavior when trying to modify the immutable config + VELOX_ASSERT_THROW(config->set(TestConfig::kInt32Entry, 100), "Cannot set"); + VELOX_ASSERT_THROW( + config->set(TestConfig::kInt32Entry.key, "100"), "Cannot set"); + VELOX_ASSERT_THROW(config->unset(TestConfig::kInt32Entry), "Cannot unset"); + VELOX_ASSERT_THROW(config->reset(), "Cannot reset"); + + // Ensure values are unchanged after attempted modifications + ASSERT_EQ(config->get(TestConfig::kInt32Entry), -3200); + ASSERT_EQ(config->get(TestConfig::kUint64Entry), 6400); + ASSERT_EQ(config->get(TestConfig::kBoolEntry), false); + ASSERT_EQ(config->get(TestConfig::kStringEntry), "not.default.string.value"); + ASSERT_EQ(config->get(TestConfig::kEnumEntry), TestEnum::ENUM_2); + ASSERT_EQ( + config->get( + TestConfig::kInt32Entry.key, TestConfig::kInt32Entry.defaultVal), + -3200); + ASSERT_EQ( + config->get( + TestConfig::kUint64Entry.key, TestConfig::kUint64Entry.defaultVal), + 6400); + ASSERT_EQ( + config->get( + TestConfig::kBoolEntry.key, TestConfig::kBoolEntry.defaultVal), + false); + ASSERT_EQ( + config->get( + TestConfig::kStringEntry.key, TestConfig::kStringEntry.defaultVal), + "not.default.string.value"); + ASSERT_EQ( + config->get( + TestConfig::kEnumEntry.key, + TestConfig::kEnumEntry.defaultVal, + TestConfig::kEnumEntry.toT), + TestEnum::ENUM_2); + ASSERT_TRUE(config->get(TestConfig::kInt32Entry.key).has_value()); + ASSERT_EQ(config->get(TestConfig::kInt32Entry.key).value(), -3200); + ASSERT_FALSE(config->get("wrong_int32_key").has_value()); + + // Testing value existence + ASSERT_TRUE(config->valueExists(TestConfig::kInt32Entry.key)); + ASSERT_FALSE(config->valueExists("non_existent_entry")); + + // Testing retrieval of raw configurations + ASSERT_EQ(expectedRawConfigs, config->rawConfigs()); + ASSERT_EQ(expectedRawConfigs, config->rawConfigsCopy()); +} + +TEST_F(ConfigTest, mutableConfig) { + // Create a mutable configuration with some initial values + std::unordered_map initialConfigs{ + {TestConfig::kInt32Entry.key, "-3200"}, + {TestConfig::kUint64Entry.key, "6400"}, + {TestConfig::kStringEntry.key, "initial.string.value"}, + {TestConfig::kBoolEntry.key, "false"}, + {TestConfig::kEnumEntry.key, "ENUM_2"}, + }; + + auto config = std::make_shared(std::move(initialConfigs), true); + + // Test setting new values + (*config) + .set(TestConfig::kInt32Entry, 123) + .set(TestConfig::kStringEntry, std::string("modified.string.value")) + .set(TestConfig::kBoolEntry.key, "true") + .set(TestConfig::kEnumEntry.key, "ENUM_1"); + + ASSERT_EQ(config->get(TestConfig::kInt32Entry), 123); + ASSERT_EQ(config->get(TestConfig::kStringEntry), "modified.string.value"); + ASSERT_EQ(config->get(TestConfig::kBoolEntry), true); + ASSERT_EQ(config->get(TestConfig::kEnumEntry), TestEnum::ENUM_1); + + // Test unsetting values + ASSERT_EQ(config->get(TestConfig::kUint64Entry), 6400); + config->unset(TestConfig::kUint64Entry); + ASSERT_EQ( + config->get(TestConfig::kUint64Entry), + TestConfig::kUint64Entry.defaultVal); + + // Test resetting the configuration + config->reset(); + auto rawConfigsCopy = config->rawConfigsCopy(); + ASSERT_TRUE(rawConfigsCopy.empty()); + ASSERT_FALSE(config->valueExists(TestConfig::kUint64Entry.key)); +} + +TEST_F(ConfigTest, capacityConversion) { + folly::Random::DefaultGenerator rng; + rng.seed(1); + + std::unordered_map> unitStrLookup{ + {CapacityUnit::BYTE, {"b", "B"}}, + {CapacityUnit::KILOBYTE, {"kb", "kB", "Kb", "KB"}}, + {CapacityUnit::MEGABYTE, {"mb", "mB", "Mb", "MB"}}, + {CapacityUnit::GIGABYTE, {"gb", "gB", "Gb", "GB"}}, + {CapacityUnit::TERABYTE, {"tb", "tB", "Tb", "TB"}}, + {CapacityUnit::PETABYTE, {"pb", "pB", "Pb", "PB"}}}; + + std::vector> units{ + {CapacityUnit::BYTE, 1}, + {CapacityUnit::KILOBYTE, 1024}, + {CapacityUnit::MEGABYTE, 1024 * 1024}, + {CapacityUnit::GIGABYTE, 1024 * 1024 * 1024}, + {CapacityUnit::TERABYTE, 1024ll * 1024 * 1024 * 1024}, + {CapacityUnit::PETABYTE, 1024ll * 1024 * 1024 * 1024 * 1024}}; + for (int32_t i = 0; i < units.size(); i++) { + for (int32_t j = 0; j < units.size(); j++) { + // We use this diffRatio to prevent float conversion overflow when + // converting from one unit to another. + uint64_t diffRatio = i < j ? units[j].second / units[i].second + : units[i].second / units[j].second; + uint64_t randNumber = folly::Random::rand64(rng); + uint64_t testNumber = i > j ? randNumber / diffRatio : randNumber; + const auto& unitStrs = unitStrLookup[units[i].first]; + for (int32_t k = 0; k < unitStrs.size(); k++) { + ASSERT_EQ( + toCapacity( + std::string(std::to_string(testNumber) + unitStrs[k]), + units[j].first), + (uint64_t)(testNumber * (units[i].second / units[j].second))); + } + } + } +} + +TEST_F(ConfigTest, durationConversion) { + folly::Random::DefaultGenerator rng; + rng.seed(1); + + std::vector> units{ + {"ns", 1}, + {"us", 1000}, + {"ms", 1000 * 1000}, + {"s", 1000ll * 1000 * 1000}, + {"m", 1000ll * 1000 * 1000 * 60}, + {"h", 1000ll * 1000 * 1000 * 60 * 60}, + {"d", 1000ll * 1000 * 1000 * 60 * 60 * 24}}; + for (uint32_t i = 0; i < units.size(); i++) { + auto testNumber = folly::Random::rand32(rng) % 10000; + auto duration = + toDuration(std::string(std::to_string(testNumber) + units[i].first)); + ASSERT_EQ( + testNumber * units[i].second, + std::chrono::duration_cast(duration).count()); + } +} +} // namespace facebook::velox::config diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 18eb9377bbff..1dc655e0fee6 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -195,19 +195,19 @@ bool HiveConfig::isFileHandleCacheEnabled() const { } uint64_t HiveConfig::orcWriterMaxStripeSize(const Config* session) const { - return toCapacity( + return config::toCapacity( session->get( kOrcWriterMaxStripeSizeSession, config_->get(kOrcWriterMaxStripeSize, "64MB")), - core::CapacityUnit::BYTE); + config::CapacityUnit::BYTE); } uint64_t HiveConfig::orcWriterMaxDictionaryMemory(const Config* session) const { - return toCapacity( + return config::toCapacity( session->get( kOrcWriterMaxDictionaryMemorySession, config_->get(kOrcWriterMaxDictionaryMemory, "16MB")), - core::CapacityUnit::BYTE); + config::CapacityUnit::BYTE); } bool HiveConfig::isOrcWriterIntegerDictionaryEncodingEnabled( @@ -267,11 +267,11 @@ uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* session) const { } uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const { - return toCapacity( + return config::toCapacity( session->get( kSortWriterMaxOutputBytesSession, config_->get(kSortWriterMaxOutputBytes, "10MB")), - core::CapacityUnit::BYTE); + config::CapacityUnit::BYTE); } uint64_t HiveConfig::footerEstimatedSize() const { diff --git a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp index 786c204344ea..0c08c0272f9f 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp @@ -285,7 +285,7 @@ class GCSFileSystem::Impl { auto max_retry_time = hiveConfig_->gcsMaxRetryTime(); if (max_retry_time) { auto retry_time = std::chrono::duration_cast( - facebook::velox::core::toDuration(max_retry_time.value())); + facebook::velox::config::toDuration(max_retry_time.value())); options.set( gcs::LimitedTimeRetryPolicy(retry_time).clone()); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index c8c50480dcc8..168f2474842b 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -546,7 +546,7 @@ class S3FileSystem::Impl { if (hiveConfig_->s3ConnectTimeout().has_value()) { clientConfig.connectTimeoutMs = std::chrono::duration_cast( - facebook::velox::core::toDuration( + facebook::velox::config::toDuration( hiveConfig_->s3ConnectTimeout().value())) .count(); } @@ -554,7 +554,7 @@ class S3FileSystem::Impl { if (hiveConfig_->s3SocketTimeout().has_value()) { clientConfig.requestTimeoutMs = std::chrono::duration_cast( - facebook::velox::core::toDuration( + facebook::velox::config::toDuration( hiveConfig_->s3SocketTimeout().value())) .count(); } diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index f784b8a69d0c..31f575fc3d99 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/config/Config.h" #include "velox/core/Config.h" namespace facebook::velox::core { @@ -365,8 +366,9 @@ class QueryConfig { static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; uint64_t queryMaxMemoryPerNode() const { - return toCapacity( - get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); + return config::toCapacity( + get(kQueryMaxMemoryPerNode, "0B"), + config::CapacityUnit::BYTE); } uint64_t maxPartialAggregationMemoryUsage() const { diff --git a/velox/dwio/dwrf/common/CMakeLists.txt b/velox/dwio/dwrf/common/CMakeLists.txt index a59b0b7d3ece..427e2a294386 100644 --- a/velox/dwio/dwrf/common/CMakeLists.txt +++ b/velox/dwio/dwrf/common/CMakeLists.txt @@ -36,6 +36,7 @@ velox_link_libraries( velox_dwio_dwrf_common velox_common_base velox_common_compression + velox_common_config velox_dwio_common velox_dwio_common_compression velox_dwio_dwrf_proto diff --git a/velox/dwio/dwrf/common/Config.h b/velox/dwio/dwrf/common/Config.h index 01ac08c79f13..7b001700bb69 100644 --- a/velox/dwio/dwrf/common/Config.h +++ b/velox/dwio/dwrf/common/Config.h @@ -24,10 +24,10 @@ namespace facebook::velox::dwrf { -class Config : public common::ConfigBase { +class Config : public config::ConfigBase { public: template - using Entry = common::ConfigBase::Entry; + using Entry = config::ConfigBase::Entry; static Entry WRITER_VERSION; static Entry COMPRESSION; @@ -80,9 +80,17 @@ class Config : public common::ConfigBase { static std::shared_ptr fromMap( const std::map& map) { - auto ret = std::make_shared(); - ret->configs_.insert(map.cbegin(), map.cend()); - return ret; + auto config = std::make_shared(); + for (const auto& pair : map) { + config->set(pair.first, pair.second); + } + return config; + } + + Config() : ConfigBase({}, true) {} + + std::map toSerdeParams() { + return std::map{configs_.cbegin(), configs_.cend()}; } }; diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 16a8d832a2d1..aef39623c221 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -134,7 +134,7 @@ class AllWriterCompressionTest TEST_P(AllWriterCompressionTest, compression) { std::map overrideConfigs; overrideConfigs.emplace( - Config::COMPRESSION.configKey(), std::to_string(compressionKind_)); + Config::COMPRESSION.key, std::to_string(compressionKind_)); auto config = Config::fromMap(overrideConfigs); auto& writer = createWriter(config); auto& context = getContext(); diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index a4a490e23984..1312ee3b9c40 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -798,49 +798,48 @@ dwrf::WriterOptions getDwrfOptions(const dwio::common::WriterOptions& options) { std::map configs; if (options.compressionKind.has_value()) { configs.emplace( - Config::COMPRESSION.configKey(), + Config::COMPRESSION.key, std::to_string(options.compressionKind.value())); } if (options.orcMinCompressionSize.has_value()) { configs.emplace( - Config::COMPRESSION_BLOCK_SIZE_MIN.configKey(), + Config::COMPRESSION_BLOCK_SIZE_MIN.key, std::to_string(options.orcMinCompressionSize.value())); } if (options.maxStripeSize.has_value()) { configs.emplace( - Config::STRIPE_SIZE.configKey(), - std::to_string(options.maxStripeSize.value())); + Config::STRIPE_SIZE.key, std::to_string(options.maxStripeSize.value())); } if (options.orcLinearStripeSizeHeuristics.has_value()) { configs.emplace( - Config::LINEAR_STRIPE_SIZE_HEURISTICS.configKey(), + Config::LINEAR_STRIPE_SIZE_HEURISTICS.key, std::to_string(options.orcLinearStripeSizeHeuristics.value())); } if (options.maxDictionaryMemory.has_value()) { configs.emplace( - Config::MAX_DICTIONARY_SIZE.configKey(), + Config::MAX_DICTIONARY_SIZE.key, std::to_string(options.maxDictionaryMemory.value())); } if (options.orcWriterIntegerDictionaryEncodingEnabled.has_value()) { configs.emplace( - Config::INTEGER_DICTIONARY_ENCODING_ENABLED.configKey(), + Config::INTEGER_DICTIONARY_ENCODING_ENABLED.key, std::to_string( options.orcWriterIntegerDictionaryEncodingEnabled.value())); } if (options.orcWriterStringDictionaryEncodingEnabled.has_value()) { configs.emplace( - Config::STRING_DICTIONARY_ENCODING_ENABLED.configKey(), + Config::STRING_DICTIONARY_ENCODING_ENABLED.key, std::to_string( options.orcWriterStringDictionaryEncodingEnabled.value())); } if (options.zlibCompressionLevel.has_value()) { configs.emplace( - Config::ZLIB_COMPRESSION_LEVEL.configKey(), + Config::ZLIB_COMPRESSION_LEVEL.key, std::to_string(options.zlibCompressionLevel.value())); } if (options.zstdCompressionLevel.has_value()) { configs.emplace( - Config::ZSTD_COMPRESSION_LEVEL.configKey(), + Config::ZSTD_COMPRESSION_LEVEL.key, std::to_string(options.zstdCompressionLevel.value())); }