From ff5a1630ab13ac32035effb78111e925ec9479aa Mon Sep 17 00:00:00 2001 From: Ke Date: Mon, 27 Nov 2023 10:49:19 -0800 Subject: [PATCH] Refactor HiveConfig --- velox/connectors/Connector.h | 16 +- velox/connectors/hive/HiveConfig.cpp | 361 ++++++++++++------ velox/connectors/hive/HiveConfig.h | 312 +++++++++++---- velox/connectors/hive/HiveConnector.cpp | 32 +- velox/connectors/hive/HiveDataSink.cpp | 34 +- velox/connectors/hive/HiveDataSink.h | 5 +- .../s3fs/RegisterS3FileSystem.cpp | 39 +- .../storage_adapters/s3fs/S3FileSystem.cpp | 28 +- .../hive/storage_adapters/s3fs/S3FileSystem.h | 6 +- velox/connectors/hive/tests/CMakeLists.txt | 3 +- .../connectors/hive/tests/HiveConfigTest.cpp | 113 ++++++ .../hive/tests/HiveConnectorTest.cpp | 12 +- .../hive/tests/HiveDataSinkTest.cpp | 21 +- velox/core/QueryCtx.cpp | 12 +- velox/core/QueryCtx.h | 12 +- velox/docs/configs.rst | 14 +- velox/dwio/common/FileSink.h | 8 +- velox/exec/Operator.cpp | 2 +- velox/exec/tests/TableWriteTest.cpp | 12 +- velox/exec/tests/utils/AggregationFuzzer.cpp | 2 +- velox/exec/tests/utils/AssertQueryBuilder.cpp | 2 +- 21 files changed, 722 insertions(+), 324 deletions(-) create mode 100644 velox/connectors/hive/tests/HiveConfigTest.cpp diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 671e6c26aeaca..8230dbafd9c51 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -237,7 +237,7 @@ class ConnectorQueryCtx { ConnectorQueryCtx( memory::MemoryPool* operatorPool, memory::MemoryPool* connectorPool, - const Config* connectorConfig, + const Config* connectorSession, const common::SpillConfig* spillConfig, std::unique_ptr expressionEvaluator, cache::AsyncDataCache* cache, @@ -247,7 +247,7 @@ class ConnectorQueryCtx { int driverId) : operatorPool_(operatorPool), connectorPool_(connectorPool), - config_(connectorConfig), + connectorSession_(connectorSession), spillConfig_(spillConfig), expressionEvaluator_(std::move(expressionEvaluator)), cache_(cache), @@ -256,7 +256,7 @@ class ConnectorQueryCtx { taskId_(taskId), driverId_(driverId), planNodeId_(planNodeId) { - VELOX_CHECK_NOT_NULL(connectorConfig); + VELOX_CHECK_NOT_NULL(connectorSession); } /// Returns the associated operator's memory pool which is a leaf kind of @@ -272,8 +272,8 @@ class ConnectorQueryCtx { return connectorPool_; } - const Config* config() const { - return config_; + const Config* connectorSession() const { + return connectorSession_; } const common::SpillConfig* spillConfig() const { @@ -315,7 +315,7 @@ class ConnectorQueryCtx { private: memory::MemoryPool* const operatorPool_; memory::MemoryPool* const connectorPool_; - const Config* config_; + const Config* connectorSession_; const common::SpillConfig* const spillConfig_; std::unique_ptr expressionEvaluator_; cache::AsyncDataCache* cache_; @@ -328,9 +328,7 @@ class ConnectorQueryCtx { class Connector { public: - explicit Connector( - const std::string& id) - : id_(id) {} + explicit Connector(const std::string& id) : id_(id) {} virtual ~Connector() = default; diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index d99d1510ee1ac..970acadbe021b 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -15,6 +15,7 @@ */ #include "velox/connectors/hive/HiveConfig.h" +#include #include "velox/core/Config.h" #include "velox/core/QueryConfig.h" @@ -22,35 +23,20 @@ namespace facebook::velox::connector::hive { -namespace { - -HiveConfig::InsertExistingPartitionsBehavior -stringToInsertExistingPartitionsBehavior(const std::string& strValue) { +InsertExistingPartitionsBehavior stringToInsertExistingPartitionsBehavior( + const std::string& strValue) { auto upperValue = boost::algorithm::to_upper_copy(strValue); if (upperValue == "ERROR") { - return HiveConfig::InsertExistingPartitionsBehavior::kError; + return InsertExistingPartitionsBehavior::kError; } if (upperValue == "OVERWRITE") { - return HiveConfig::InsertExistingPartitionsBehavior::kOverwrite; + return InsertExistingPartitionsBehavior::kOverwrite; } VELOX_UNSUPPORTED( "Unsupported insert existing partitions behavior: {}.", strValue); } -} // namespace - -// static -HiveConfig::InsertExistingPartitionsBehavior -HiveConfig::insertExistingPartitionsBehavior(const Config* config) { - const auto behavior = - config->get(kInsertExistingPartitionsBehavior); - return behavior.has_value() - ? stringToInsertExistingPartitionsBehavior(behavior.value()) - : InsertExistingPartitionsBehavior::kError; -} - -// static -std::string HiveConfig::insertExistingPartitionsBehaviorString( +std::string insertExistingPartitionsBehaviorString( InsertExistingPartitionsBehavior behavior) { switch (behavior) { case InsertExistingPartitionsBehavior::kError: @@ -62,166 +48,301 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString( } } +InsertExistingPartitionsBehavior HiveConfig::insertExistingPartitionsBehavior() + const { + return insertExistingPartitionsBehavior_; +} + +uint32_t HiveConfig::maxPartitionsPerWriters(const Config* session) const { + if (session->isValueExists(kMaxPartitionsPerWritersSession)) { + return session->get(kMaxPartitionsPerWritersSession).value(); + } + return maxPartitionsPerWriters_; +} + +bool HiveConfig::immutablePartitions() const { + return immutablePartitions_; +} + +bool HiveConfig::s3UseVirtualAddressing() const { + return s3PathStyleAccess_; +} + +std::string HiveConfig::s3GetLogLevel() const { + return s3LogLevel_; +} + +bool HiveConfig::s3UseSSL() const { + return s3SSLEnabled_; +} + +bool HiveConfig::s3UseInstanceCredentials() const { + return s3UseInstanceCredentials_; +} + +std::string HiveConfig::s3Endpoint() const { + return s3Endpoint_; +} + +std::optional HiveConfig::s3AccessKey() const { + return s3AwsAccessKey_; +} + +std::optional HiveConfig::s3SecretKey() const { + return s3AwsSecretKey_; +} + +std::optional HiveConfig::s3IAMRole() const { + return s3IamRole_; +} + +std::string HiveConfig::s3IAMRoleSessionName() const { + return s3IamRoleSessionName_; +} + +std::string HiveConfig::gcsEndpoint() const { + return GCSEndpoint_; +} + +std::string HiveConfig::gcsScheme() const { + return GCSScheme_; +} + +std::string HiveConfig::gcsCredentials() const { + return GCSCredentials_; +} + +bool HiveConfig::isOrcUseColumnNames() const { + return orcUseColumnNames_; +} + +bool HiveConfig::isFileColumnNamesReadAsLowerCase() const { + return fileColumnNamesReadAsLowerCase_; +} + +int64_t HiveConfig::maxCoalescedBytes() const { + return maxCoalescedBytes_; +} + +int32_t HiveConfig::maxCoalescedDistanceBytes() const { + return maxCoalescedDistanceBytes_; +} + +int32_t HiveConfig::numCacheFileHandles() const { + return numCacheFileHandles_; +} + +bool HiveConfig::isFileHandleCacheEnabled() const { + return enableFileHandleCache_; +} + +uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* session) const { + if (session->isValueExists(kSortWriterMaxOutputRowsSession)) { + return session->get(kSortWriterMaxOutputRowsSession).value(); + } + return sortWriterMaxOutputRows_; +} + +uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const { + if (session->isValueExists(kSortWriterMaxOutputBytesSession)) { + return session->get(kSortWriterMaxOutputBytesSession).value(); + } + return sortWriterMaxOutputBytes_; +} + +uint64_t HiveConfig::getOrcWriterMaxStripeSize(const Config* session) const { + if (session->isValueExists(kOrcWriterMaxStripeSize)) { + return session->get(kOrcWriterMaxStripeSize).value(); + } + return orcWriterMaxStripeSize_; +} + +uint64_t HiveConfig::getOrcWriterMaxDictionaryMemory( + const Config* session) const { + if (session->isValueExists(kOrcWriterMaxDictionaryMemorySession)) { + return session->get(kOrcWriterMaxDictionaryMemorySession).value(); + } + return orcWriterMaxDictionaryMemory_; +} + // static -uint32_t HiveConfig::maxPartitionsPerWriters(const Config* config) { - return config->get(kMaxPartitionsPerWriters, 100); +void HiveConfig::setInsertExistingPartitionsBehavior( + HiveConfig* hiveConfig, + std::string insertExistingPartitionsBehavior) { + hiveConfig->insertExistingPartitionsBehavior_ = + stringToInsertExistingPartitionsBehavior( + insertExistingPartitionsBehavior); } // static -bool HiveConfig::immutablePartitions(const Config* config) { - return config->get(kImmutablePartitions, false); +void HiveConfig::setMaxPartitionsPerWriters( + HiveConfig* hiveConfig, + std::string maxPartitionsPerWriters) { + hiveConfig->maxPartitionsPerWriters_ = + folly::to(maxPartitionsPerWriters); } // static -bool HiveConfig::s3UseVirtualAddressing(const Config* config) { - return !config->get(kS3PathStyleAccess, false); +void HiveConfig::setImmutablePartitions( + HiveConfig* hiveConfig, + std::string immutablePartitions) { + hiveConfig->immutablePartitions_ = folly::to(immutablePartitions); } // static -std::string HiveConfig::s3GetLogLevel(const Config* config) { - return config->get(kS3LogLevel, std::string("FATAL")); +void HiveConfig::setS3PathStyleAccess( + HiveConfig* hiveConfig, + std::string s3PathStyleAccess) { + hiveConfig->s3PathStyleAccess_ = folly::to(s3PathStyleAccess); } // static -bool HiveConfig::s3UseSSL(const Config* config) { - return config->get(kS3SSLEnabled, true); +void HiveConfig::setS3LogLevel(HiveConfig* hiveConfig, std::string s3LogLevel) { + hiveConfig->s3LogLevel_ = folly::to(s3LogLevel); } // static -bool HiveConfig::s3UseInstanceCredentials(const Config* config) { - return config->get(kS3UseInstanceCredentials, false); +void HiveConfig::setS3SSLEnabled( + HiveConfig* hiveConfig, + std::string s3SSLEnabled) { + hiveConfig->s3SSLEnabled_ = folly::to(s3SSLEnabled); } // static -std::string HiveConfig::s3Endpoint(const Config* config) { - return config->get(kS3Endpoint, std::string("")); +void HiveConfig::setS3UseInstanceCredentials( + HiveConfig* hiveConfig, + std::string s3UseInstanceCredentials) { + hiveConfig->s3UseInstanceCredentials_ = + folly::to(s3UseInstanceCredentials); } // static -std::optional HiveConfig::s3AccessKey(const Config* config) { - if (config->isValueExists(kS3AwsAccessKey)) { - return config->get(kS3AwsAccessKey).value(); - } - return {}; +void HiveConfig::setS3Endpoint(HiveConfig* hiveConfig, std::string s3Endpoint) { + hiveConfig->s3Endpoint_ = folly::to(s3Endpoint); } // static -std::optional HiveConfig::s3SecretKey(const Config* config) { - if (config->isValueExists(kS3AwsSecretKey)) { - return config->get(kS3AwsSecretKey).value(); - } - return {}; +void HiveConfig::setS3AwsAccessKey( + HiveConfig* hiveConfig, + std::string s3AwsAccessKey) { + hiveConfig->s3AwsAccessKey_ = + std::optional(folly::to(s3AwsAccessKey)); } // static -std::optional HiveConfig::s3IAMRole(const Config* config) { - if (config->isValueExists(kS3IamRole)) { - return config->get(kS3IamRole).value(); - } - return {}; +void HiveConfig::setS3AwsSecretKey( + HiveConfig* hiveConfig, + std::string s3AwsSecretKey) { + hiveConfig->s3AwsSecretKey_ = + std::optional(folly::to(s3AwsSecretKey)); +} + +// static +void HiveConfig::setS3IamRole(HiveConfig* hiveConfig, std::string s3IamRole) { + hiveConfig->s3IamRole_ = std::optional(folly::to(s3IamRole)); } // static -std::string HiveConfig::s3IAMRoleSessionName(const Config* config) { - return config->get(kS3IamRoleSessionName, std::string("velox-session")); +void HiveConfig::setS3IamRoleSessionName( + HiveConfig* hiveConfig, + std::string s3IamRoleSessionName) { + hiveConfig->s3IamRoleSessionName_ = + folly::to(s3IamRoleSessionName); } // static -std::string HiveConfig::gcsEndpoint(const Config* config) { - return config->get(kGCSEndpoint, std::string("")); +void HiveConfig::setGCSEndpoint( + HiveConfig* hiveConfig, + std::string GCSEndpoint) { + hiveConfig->GCSEndpoint_ = folly::to(GCSEndpoint); } // static -std::string HiveConfig::gcsScheme(const Config* config) { - return config->get(kGCSScheme, std::string("https")); +void HiveConfig::setGCSScheme(HiveConfig* hiveConfig, std::string GCSScheme) { + hiveConfig->GCSScheme_ = folly::to(GCSScheme); } // static -std::string HiveConfig::gcsCredentials(const Config* config) { - return config->get(kGCSCredentials, std::string("")); +void HiveConfig::setGCSCredentials( + HiveConfig* hiveConfig, + std::string GCSCredentials) { + hiveConfig->GCSCredentials_ = folly::to(GCSCredentials); } -// static. -bool HiveConfig::isOrcUseColumnNames(const Config* config) { - return config->get(kOrcUseColumnNames, false); +// static +void HiveConfig::setOrcUseColumnNames( + HiveConfig* hiveConfig, + std::string orcUseColumnNames) { + hiveConfig->orcUseColumnNames_ = folly::to(orcUseColumnNames); } -// static. -bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* config) { - return config->get(kFileColumnNamesReadAsLowerCase, false); +// static +void HiveConfig::setFileColumnNamesReadAsLowerCase( + HiveConfig* hiveConfig, + std::string fileColumnNamesReadAsLowerCase) { + hiveConfig->fileColumnNamesReadAsLowerCase_ = + folly::to(fileColumnNamesReadAsLowerCase); } -// static. -int64_t HiveConfig::maxCoalescedBytes(const Config* config) { - return config->get(kMaxCoalescedBytes, 128 << 20); +// static +void HiveConfig::setMaxCoalescedBytes( + HiveConfig* hiveConfig, + std::string maxCoalescedBytes) { + hiveConfig->maxCoalescedBytes_ = folly::to(maxCoalescedBytes); } -// static. -int32_t HiveConfig::maxCoalescedDistanceBytes(const Config* config) { - return config->get(kMaxCoalescedDistanceBytes, 512 << 10); +// static +void HiveConfig::setMaxCoalescedDistanceBytes( + HiveConfig* hiveConfig, + std::string maxCoalescedDistanceBytes) { + hiveConfig->maxCoalescedDistanceBytes_ = + folly::to(maxCoalescedDistanceBytes); } -// static. -int32_t HiveConfig::numCacheFileHandles(const Config* config) { - return config->get(kNumCacheFileHandles, 20'000); +// static +void HiveConfig::setNumCacheFileHandles( + HiveConfig* hiveConfig, + std::string numCacheFileHandles) { + hiveConfig->numCacheFileHandles_ = folly::to(numCacheFileHandles); } -// static. -bool HiveConfig::isFileHandleCacheEnabled(const Config* config) { - return config->get(kEnableFileHandleCache, true); +// static +void HiveConfig::setEnableFileHandleCache( + HiveConfig* hiveConfig, + std::string enableFileHandleCache) { + hiveConfig->enableFileHandleCache_ = folly::to(enableFileHandleCache); } -// static. -uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* config) { - return config->get(kSortWriterMaxOutputRows, 1024); +// static +void HiveConfig::setSortWriterMaxOutputRows( + HiveConfig* hiveConfig, + std::string sortWriterMaxOutputRows) { + hiveConfig->sortWriterMaxOutputRows_ = + folly::to(sortWriterMaxOutputRows); } -// static. -uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* config) { - return config->get(kSortWriterMaxOutputBytes, 10UL << 20); +// static +void HiveConfig::setSortWriterMaxOutputBytes( + HiveConfig* hiveConfig, + std::string sortWriterMaxOutputBytes) { + hiveConfig->sortWriterMaxOutputBytes_ = + folly::to(sortWriterMaxOutputBytes); } -uint64_t HiveConfig::getOrcWriterMaxStripeSize( - const Config* connectorQueryCtxConfig, - const Config* connectorPropertiesConfig) { - if (connectorQueryCtxConfig != nullptr && - connectorQueryCtxConfig->isValueExists(kOrcWriterMaxStripeSize)) { - return toCapacity( - connectorQueryCtxConfig->get(kOrcWriterMaxStripeSize) - .value(), - core::CapacityUnit::BYTE); - } - if (connectorPropertiesConfig != nullptr && - connectorPropertiesConfig->isValueExists(kOrcWriterMaxStripeSizeConfig)) { - return toCapacity( - connectorPropertiesConfig - ->get(kOrcWriterMaxStripeSizeConfig) - .value(), - core::CapacityUnit::BYTE); - } - return 64L * 1024L * 1024L; +// static +void HiveConfig::setOrcWriterMaxStripeSize( + HiveConfig* hiveConfig, + std::string orcWriterMaxStripeSize) { + hiveConfig->orcWriterMaxStripeSize_ = + folly::to(orcWriterMaxStripeSize); } -uint64_t HiveConfig::getOrcWriterMaxDictionaryMemory( - const Config* connectorQueryCtxConfig, - const Config* connectorPropertiesConfig) { - if (connectorQueryCtxConfig != nullptr && - connectorQueryCtxConfig->isValueExists(kOrcWriterMaxDictionaryMemory)) { - return toCapacity( - connectorQueryCtxConfig->get(kOrcWriterMaxDictionaryMemory) - .value(), - core::CapacityUnit::BYTE); - } - if (connectorPropertiesConfig != nullptr && - connectorPropertiesConfig->isValueExists( - kOrcWriterMaxDictionaryMemoryConfig)) { - return toCapacity( - connectorPropertiesConfig - ->get(kOrcWriterMaxDictionaryMemoryConfig) - .value(), - core::CapacityUnit::BYTE); - } - return 16L * 1024L * 1024L; +// static +void HiveConfig::setOrcWriterMaxDictionaryMemory( + HiveConfig* hiveConfig, + std::string orcWriterMaxDictionaryMemory) { + hiveConfig->orcWriterMaxDictionaryMemory_ = + folly::to(orcWriterMaxDictionaryMemory); } } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 7e06c03b1de56..860aa3846598f 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -15,6 +15,9 @@ */ #pragma once +#include +#include +#include #include #include @@ -24,25 +27,106 @@ class Config; namespace facebook::velox::connector::hive { +enum class InsertExistingPartitionsBehavior { + kError, + kOverwrite, +}; + +std::string insertExistingPartitionsBehaviorString( + InsertExistingPartitionsBehavior behavior); + /// Hive connector configs. class HiveConfig { public: - enum class InsertExistingPartitionsBehavior { - kError, - kOverwrite, + HiveConfig( + const std::unordered_map& connectorConf) { + for (auto config = connectorConf.begin(); config != connectorConf.end(); + config++) { + if (configSetters.count(config->first) != 0) { + configSetters.at(config->first)(this, config->second); + } else { + LOG(ERROR) << "Invalid hive config:" << config->first << std::endl; + ; + } + } }; - static std::string insertExistingPartitionsBehaviorString( - InsertExistingPartitionsBehavior behavior); + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior() const; + + uint32_t maxPartitionsPerWriters(const Config* session) const; + + bool immutablePartitions() const; + + bool s3UseVirtualAddressing() const; + + std::string s3GetLogLevel() const; + + bool s3UseSSL() const; + + bool s3UseInstanceCredentials() const; + + std::string s3Endpoint() const; + + std::optional s3AccessKey() const; + + std::optional s3SecretKey() const; + + std::optional s3IAMRole() const; + + std::string s3IAMRoleSessionName() const; + + std::string gcsEndpoint() const; + + std::string gcsScheme() const; + + std::string gcsCredentials() const; + + bool isOrcUseColumnNames() const; + + bool isFileColumnNamesReadAsLowerCase() const; + + int64_t maxCoalescedBytes() const; + + int32_t maxCoalescedDistanceBytes() const; + + int32_t numCacheFileHandles() const; + + bool isFileHandleCacheEnabled() const; + + uint64_t fileWriterFlushThresholdBytes() const; + + uint32_t sortWriterMaxOutputRows(const Config* session) const; + + uint64_t sortWriterMaxOutputBytes(const Config* session) const; + + uint64_t getOrcWriterMaxStripeSize(const Config* session) const; + + uint64_t getOrcWriterMaxDictionaryMemory(const Config* session) const; + + /// Sessions + static constexpr const char* kMaxPartitionsPerWritersSession = + "max_partitions_per_writers"; + + static constexpr const char* kOrcWriterMaxStripeSizeSession = + "orc_optimized_writer_max_stripe_size"; + + static constexpr const char* kOrcWriterMaxDictionaryMemorySession = + "orc_optimized_writer_max_dictionary_memory"; + static constexpr const char* kSortWriterMaxOutputRowsSession = + "sort_writer_max_output_rows"; + static constexpr const char* kSortWriterMaxOutputBytesSession = + "sort_writer_max_output_bytes"; + + /// Configs /// Behavior on insert into existing partitions. static constexpr const char* kInsertExistingPartitionsBehavior = - "insert_existing_partitions_behavior"; + "insert-existing-partitions-behavior"; /// Maximum number of (bucketed) partitions per a single table writer /// instance. static constexpr const char* kMaxPartitionsPerWriters = - "max_partitions_per_writers"; + "max-partitions-per-writers"; /// Whether new data can be inserted into an unpartition table. /// Velox currently does not support appending data to existing partitions. @@ -105,84 +189,170 @@ class HiveConfig { "max-coalesced-distance-bytes"; /// Maximum number of entries in the file handle cache. - static constexpr const char* kNumCacheFileHandles = "num_cached_file_handles"; + static constexpr const char* kNumCacheFileHandles = "num-cached-file-handles"; /// Enable file handle cache. static constexpr const char* kEnableFileHandleCache = - "file_handle_cache_enabled"; + "file-handle-cache-enabled"; - // TODO: Refactor and merge config and session property. static constexpr const char* kOrcWriterMaxStripeSize = - "orc_optimized_writer_max_stripe_size"; - static constexpr const char* kOrcWriterMaxStripeSizeConfig = "hive.orc.writer.stripe-max-size"; static constexpr const char* kOrcWriterMaxDictionaryMemory = - "orc_optimized_writer_max_dictionary_memory"; - static constexpr const char* kOrcWriterMaxDictionaryMemoryConfig = "hive.orc.writer.dictionary-max-memory"; static constexpr const char* kSortWriterMaxOutputRows = - "sort_writer_max_output_rows"; + "sort-writer-max-output-rows"; static constexpr const char* kSortWriterMaxOutputBytes = - "sort_writer_max_output_bytes"; - - static InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( - const Config* config); - - static uint32_t maxPartitionsPerWriters(const Config* config); - - static bool immutablePartitions(const Config* config); - - static bool s3UseVirtualAddressing(const Config* config); - - static std::string s3GetLogLevel(const Config* config); - - static bool s3UseSSL(const Config* config); - - static bool s3UseInstanceCredentials(const Config* config); - - static std::string s3Endpoint(const Config* config); - - static std::optional s3AccessKey(const Config* config); + "sort-writer-max-output-bytes"; - static std::optional s3SecretKey(const Config* config); + private: + static void setInsertExistingPartitionsBehavior( + HiveConfig* hiveConfig, + std::string insertExistingPartitionsBehavior); - static std::optional s3IAMRole(const Config* config); - - static std::string s3IAMRoleSessionName(const Config* config); - - static std::string gcsEndpoint(const Config* config); - - static std::string gcsScheme(const Config* config); - - static std::string gcsCredentials(const Config* config); - - static bool isOrcUseColumnNames(const Config* config); - - static bool isFileColumnNamesReadAsLowerCase(const Config* config); - - static int64_t maxCoalescedBytes(const Config* config); - - static int32_t maxCoalescedDistanceBytes(const Config* config); - - static int32_t numCacheFileHandles(const Config* config); - - static bool isFileHandleCacheEnabled(const Config* config); - - static uint64_t fileWriterFlushThresholdBytes(const Config* config); - - static uint32_t sortWriterMaxOutputRows(const Config* config); - - static uint64_t sortWriterMaxOutputBytes(const Config* config); - - static uint64_t getOrcWriterMaxStripeSize( - const Config* connectorQueryCtxConfig, - const Config* connectorPropertiesConfig); - - static uint64_t getOrcWriterMaxDictionaryMemory( - const Config* connectorQueryCtxConfig, - const Config* connectorPropertiesConfig); + static void setMaxPartitionsPerWriters( + HiveConfig* hiveConfig, + std::string maxPartitionsPerWriters); + + static void setImmutablePartitions( + HiveConfig* hiveConfig, + std::string immutablePartitions); + + static void setS3PathStyleAccess( + HiveConfig* hiveConfig, + std::string s3PathStyleAccess); + + static void setS3LogLevel(HiveConfig* hiveConfig, std::string s3LogLevel); + + static void setS3SSLEnabled(HiveConfig* hiveConfig, std::string s3SSLEnabled); + + static void setS3UseInstanceCredentials( + HiveConfig* hiveConfig, + std::string s3UseInstanceCredentials); + + static void setS3Endpoint(HiveConfig* hiveConfig, std::string s3Endpoint); + + static void setS3AwsAccessKey( + HiveConfig* hiveConfig, + std::string s3AwsAccessKey); + + static void setS3AwsSecretKey( + HiveConfig* hiveConfig, + std::string s3AwsSecretKey); + + static void setS3IamRole(HiveConfig* hiveConfig, std::string s3IamRole); + + static void setS3IamRoleSessionName( + HiveConfig* hiveConfig, + std::string s3IamRoleSessionName); + + static void setGCSEndpoint(HiveConfig* hiveConfig, std::string GCSEndpoint); + + static void setGCSScheme(HiveConfig* hiveConfig, std::string GCSScheme); + + static void setGCSCredentials( + HiveConfig* hiveConfig, + std::string GCSCredentials); + + static void setOrcUseColumnNames( + HiveConfig* hiveConfig, + std::string orcUseColumnNames); + + static void setFileColumnNamesReadAsLowerCase( + HiveConfig* hiveConfig, + std::string fileColumnNamesReadAsLowerCase); + + static void setMaxCoalescedBytes( + HiveConfig* hiveConfig, + std::string maxCoalescedBytes); + + static void setMaxCoalescedDistanceBytes( + HiveConfig* hiveConfig, + std::string maxCoalescedDistanceBytes); + + static void setNumCacheFileHandles( + HiveConfig* hiveConfig, + std::string numCacheFileHandles); + + // static + static void setEnableFileHandleCache( + HiveConfig* hiveConfig, + std::string enableFileHandleCache); + + static void setSortWriterMaxOutputRows( + HiveConfig* hiveConfig, + std::string sortWriterMaxOutputRows); + + static void setSortWriterMaxOutputBytes( + HiveConfig* hiveConfig, + std::string sortWriterMaxOutputBytes); + + // static + static void setOrcWriterMaxStripeSize( + HiveConfig* hiveConfig, + std::string orcWriterMaxStripeSize); + + static void setOrcWriterMaxDictionaryMemory( + HiveConfig* hiveConfig, + std::string orcWriterMaxDictionaryMemory); + + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior_ = + InsertExistingPartitionsBehavior::kError; + uint32_t maxPartitionsPerWriters_ = 100; + bool immutablePartitions_ = false; + bool s3PathStyleAccess_ = false; + std::string s3LogLevel_ = "FATAL"; + bool s3SSLEnabled_ = true; + bool s3UseInstanceCredentials_ = false; + std::string s3Endpoint_ = ""; + std::optional s3AwsAccessKey_{}; + std::optional s3AwsSecretKey_{}; + std::optional s3IamRole_{}; + std::string s3IamRoleSessionName_ = "velox-session"; + std::string GCSEndpoint_ = ""; + std::string GCSScheme_ = "https"; + std::string GCSCredentials_ = ""; + bool orcUseColumnNames_ = false; + bool fileColumnNamesReadAsLowerCase_ = false; + int64_t maxCoalescedBytes_ = 128 << 20; + int32_t maxCoalescedDistanceBytes_ = 512 << 10; + int32_t numCacheFileHandles_ = 20'000; + bool enableFileHandleCache_ = true; + int32_t sortWriterMaxOutputRows_ = 1024; + uint64_t sortWriterMaxOutputBytes_ = 10UL << 20; + uint64_t orcWriterMaxStripeSize_ = 64L * 1024L * 1024L; + uint64_t orcWriterMaxDictionaryMemory_ = 16L * 1024L * 1024L; + + std::unordered_map> + configSetters = { + {kInsertExistingPartitionsBehavior, + setInsertExistingPartitionsBehavior}, + {kMaxPartitionsPerWriters, setMaxPartitionsPerWriters}, + {kImmutablePartitions, setImmutablePartitions}, + {kS3PathStyleAccess, setS3PathStyleAccess}, + {kS3LogLevel, setS3LogLevel}, + {kS3SSLEnabled, setS3SSLEnabled}, + {kS3UseInstanceCredentials, setS3UseInstanceCredentials}, + {kS3Endpoint, setS3Endpoint}, + {kS3AwsAccessKey, setS3AwsAccessKey}, + {kS3AwsSecretKey, setS3AwsSecretKey}, + {kS3IamRole, setS3IamRole}, + {kS3IamRoleSessionName, setS3IamRoleSessionName}, + {kGCSEndpoint, setGCSEndpoint}, + {kGCSScheme, setGCSScheme}, + {kGCSCredentials, setGCSCredentials}, + {kOrcUseColumnNames, setOrcUseColumnNames}, + {kFileColumnNamesReadAsLowerCase, setFileColumnNamesReadAsLowerCase}, + {kMaxCoalescedBytes, setMaxCoalescedBytes}, + {kMaxCoalescedDistanceBytes, setMaxCoalescedDistanceBytes}, + {kNumCacheFileHandles, setNumCacheFileHandles}, + {kEnableFileHandleCache, setEnableFileHandleCache}, + {kOrcWriterMaxStripeSize, setOrcWriterMaxStripeSize}, + {kOrcWriterMaxDictionaryMemory, setOrcWriterMaxDictionaryMemory}, + {kSortWriterMaxOutputRows, setSortWriterMaxOutputRows}, + {kSortWriterMaxOutputBytes, setSortWriterMaxOutputBytes}, + }; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 9f38f0b01228d..a5cb408cafa6c 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -48,32 +48,24 @@ using namespace facebook::velox::dwrf; namespace facebook::velox::connector::hive { -bool isFileHandleCacheEnabled(const Config* properties) { - return properties ? HiveConfig::isFileHandleCacheEnabled(properties) : true; -} - -int32_t numCachedFileHandles(const Config* properties) { - return properties ? HiveConfig::numCacheFileHandles(properties) : 20'000; -} - HiveConnector::HiveConnector( const std::string& id, const std::unordered_map& connectorConf, folly::Executor* FOLLY_NULLABLE executor) : Connector(id), + hiveConfig_(std::make_shared(connectorConf)), fileHandleFactory_( - isFileHandleCacheEnabled(properties.get()) + hiveConfig_->isFileHandleCacheEnabled() ? std::make_unique< SimpleLRUCache>>( - numCachedFileHandles(properties.get())) + hiveConfig_->numCacheFileHandles()) : nullptr, - std::make_unique(properties)), + std::make_unique(nullptr)), executor_(executor) { - if (isFileHandleCacheEnabled(properties.get())) { + if (hiveConfig_->isFileHandleCacheEnabled()) { LOG(INFO) << "Hive connector " << connectorId() << " created with maximum of " - << numCachedFileHandles(properties.get()) - << " cached file handles."; + << hiveConfig_->numCacheFileHandles() << " cached file handles."; } else { LOG(INFO) << "Hive connector " << connectorId() << " created with file handle cache disabled"; @@ -88,15 +80,11 @@ std::unique_ptr HiveConnector::createDataSource( std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool()); - options.setMaxCoalesceBytes( - HiveConfig::maxCoalescedBytes(connectorQueryCtx->config())); - options.setMaxCoalesceDistance( - HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config())); + options.setMaxCoalesceBytes(hiveConfig_->maxCoalescedBytes()); + options.setMaxCoalesceDistance(hiveConfig_->maxCoalescedDistanceBytes()); options.setFileColumnNamesReadAsLowerCase( - HiveConfig::isFileColumnNamesReadAsLowerCase( - connectorQueryCtx->config())); - options.setUseColumnNamesForColumnMapping( - HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config())); + hiveConfig_->isFileColumnNamesReadAsLowerCase()); + options.setUseColumnNamesForColumnMapping(hiveConfig_->isOrcUseColumnNames()); return std::make_unique( outputType, diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 69f9ea79485b2..c478b12acea4f 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -290,14 +290,14 @@ HiveDataSink::HiveDataSink( std::shared_ptr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy, - const std::shared_ptr& connectorProperties) + const std::shared_ptr& hiveConfig) : inputType_(std::move(inputType)), insertTableHandle_(std::move(insertTableHandle)), connectorQueryCtx_(connectorQueryCtx), commitStrategy_(commitStrategy), - connectorProperties_(connectorProperties), - maxOpenWriters_( - HiveConfig::maxPartitionsPerWriters(connectorQueryCtx_->config())), + hiveConfig_(hiveConfig), + maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters( + connectorQueryCtx->connectorSession())), partitionChannels_(getPartitionChannels(insertTableHandle_)), partitionIdGenerator_( !partitionChannels_.empty() ? std::make_unique( @@ -602,11 +602,10 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { } options.nonReclaimableSection = writerInfo_.back()->nonReclaimableSectionHolder.get(); - options.maxStripeSize = std::optional(HiveConfig::getOrcWriterMaxStripeSize( - connectorQueryCtx_->config(), connectorProperties_.get())); + options.maxStripeSize = + std::optional(hiveConfig_->getOrcWriterMaxStripeSize()); options.maxDictionaryMemory = - std::optional(HiveConfig::getOrcWriterMaxDictionaryMemory( - connectorQueryCtx_->config(), connectorProperties_.get())); + std::optional(hiveConfig_->getOrcWriterMaxDictionaryMemory()); ioStats_.emplace_back(std::make_shared()); // Prevents the memory allocation during the writer creation. @@ -615,7 +614,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { dwio::common::FileSink::create( writePath, {.bufferWrite = false, - .connectorProperties = connectorProperties_, + .hiveConfig = hiveConfig_, .pool = writerInfo_.back()->sinkPool.get(), .metricLogger = dwio::common::MetricsLog::voidLog(), .stats = ioStats_.back().get()}), @@ -649,8 +648,8 @@ HiveDataSink::maybeCreateBucketSortWriter( return std::make_unique( std::move(writer), std::move(sortBuffer), - HiveConfig::sortWriterMaxOutputRows(connectorQueryCtx_->config()), - HiveConfig::sortWriterMaxOutputBytes(connectorQueryCtx_->config()), + hiveConfig_->sortWriterMaxOutputRows(), + hiveConfig_->sortWriterMaxOutputBytes(), writerInfo_.back()->spillStats.get()); } @@ -735,24 +734,23 @@ std::pair HiveDataSink::getWriterFileNames( HiveWriterParameters::UpdateMode HiveDataSink::getUpdateMode() const { if (insertTableHandle_->isInsertTable()) { if (insertTableHandle_->isPartitioned()) { - const auto insertBehavior = HiveConfig::insertExistingPartitionsBehavior( - connectorQueryCtx_->config()); + const auto insertBehavior = + hiveConfig_->insertExistingPartitionsBehavior(); switch (insertBehavior) { - case HiveConfig::InsertExistingPartitionsBehavior::kOverwrite: + case InsertExistingPartitionsBehavior::kOverwrite: return HiveWriterParameters::UpdateMode::kOverwrite; - case HiveConfig::InsertExistingPartitionsBehavior::kError: + case InsertExistingPartitionsBehavior::kError: return HiveWriterParameters::UpdateMode::kNew; default: VELOX_UNSUPPORTED( "Unsupported insert existing partitions behavior: {}", - HiveConfig::insertExistingPartitionsBehaviorString( - insertBehavior)); + insertExistingPartitionsBehaviorString(insertBehavior)); } } else { if (insertTableHandle_->isBucketed()) { VELOX_USER_FAIL("Cannot insert into bucketed unpartitioned Hive table"); } - if (HiveConfig::immutablePartitions(connectorProperties_.get())) { + if (hiveConfig_->immutablePartitions()) { VELOX_USER_FAIL("Unpartitioned Hive tables are immutable."); } return HiveWriterParameters::UpdateMode::kAppend; diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 0c5dea8266efc..19d48abb0a4fb 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -17,6 +17,7 @@ #include "velox/common/compression/Compression.h" #include "velox/connectors/Connector.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" @@ -415,7 +416,7 @@ class HiveDataSink : public DataSink { std::shared_ptr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy, - const std::shared_ptr& connectorProperties); + const std::shared_ptr& hiveConfig); static uint32_t maxBucketCount() { static const uint32_t kMaxBucketCount = 100'000; @@ -542,7 +543,7 @@ class HiveDataSink : public DataSink { const std::shared_ptr insertTableHandle_; const ConnectorQueryCtx* const connectorQueryCtx_; const CommitStrategy commitStrategy_; - const std::shared_ptr connectorProperties_; + const std::shared_ptr hiveConfig_; const uint32_t maxOpenWriters_; const std::vector partitionChannels_; const std::unique_ptr partitionIdGenerator_; diff --git a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp index 5e8a9fd650c2d..35cd0dd57c957 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp @@ -15,6 +15,7 @@ */ #ifdef VELOX_ENABLE_S3 +#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/core/Config.h" @@ -26,6 +27,7 @@ namespace facebook::velox::filesystems { #ifdef VELOX_ENABLE_S3 +using namespace facebook::velox::connector::hive; folly::once_flag S3FSInstantiationFlag; // Only one instance of S3FileSystem is supported for now. @@ -35,22 +37,24 @@ static std::shared_ptr s3fs = nullptr; std::function(std::shared_ptr, std::string_view)> fileSystemGenerator() { - static auto filesystemGenerator = [](std::shared_ptr properties, - std::string_view filePath) { - folly::call_once(S3FSInstantiationFlag, [&properties]() { - std::shared_ptr fs; - if (properties != nullptr) { - initializeS3(properties.get()); - fs = std::make_shared(properties); - } else { - auto config = std::make_shared(); - initializeS3(config.get()); - fs = std::make_shared(config); - } - s3fs = fs; - }); - return s3fs; - }; + static auto filesystemGenerator = + [](std::shared_ptr hiveConfig, + std::string_view filePath) { + folly::call_once(S3FSInstantiationFlag, [&hiveConfig]() { + std::shared_ptr fs; + if (hiveConfig != nullptr) { + initializeS3(hiveConfig.get()); + fs = std::make_shared(hiveConfig); + } else { + auto hiveConfig = std::make_shared( + std::unordered_map()); + initializeS3(hiveConfig.get()); + fs = std::make_shared(hiveConfig); + } + s3fs = fs; + }); + return s3fs; + }; return filesystemGenerator; } @@ -63,8 +67,7 @@ s3WriteFileSinkGenerator() { const velox::dwio::common::FileSink::Options& options) -> std::unique_ptr { if (isS3File(fileURI)) { - auto fileSystem = - filesystems::getFileSystem(fileURI, options.connectorProperties); + auto fileSystem = filesystems::getFileSystem(fileURI, options.hiveConfig); return std::make_unique( fileSystem->openFileForWrite(fileURI, {{}, options.pool}), fileURI, diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 13f170da92923..4dd43607f27ec 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -430,13 +430,13 @@ struct AwsInstance { } // Returns true iff the instance was newly initialized with config. - bool initialize(const Config* config) { + bool initialize(const HiveConfig* hiveConfig) { if (isFinalized_.load()) { VELOX_FAIL("Attempt to initialize S3 after it has been finalized."); } if (!isInitialized_.exchange(true)) { // Not already initialized. - doInitialize(config); + doInitialize(hiveConfig); return true; } return false; @@ -468,9 +468,9 @@ struct AwsInstance { } private: - void doInitialize(const Config* config) { + void doInitialize(const HiveConfig* hiveConfig) { awsOptions_.loggingOptions.logLevel = - inferS3LogLevel(HiveConfig::s3GetLogLevel(config)); + inferS3LogLevel(hiveConfig->s3GetLogLevel()); // In some situations, curl triggers a SIGPIPE signal causing the entire // process to be terminated without any notification. // This behavior is seen via Prestissimo on AmazonLinux2 on AWS EC2. @@ -493,8 +493,8 @@ AwsInstance* getAwsInstance() { return instance.get(); } -bool initializeS3(const Config* config) { - return getAwsInstance()->initialize(config); +bool initializeS3(const HiveConfig* hiveConfig) { + return getAwsInstance()->initialize(hiveConfig); } static std::atomic fileSystemCount = 0; @@ -506,12 +506,12 @@ void finalizeS3() { class S3FileSystem::Impl { public: - Impl(const Config* config) : config_(config) { + Impl(const HiveConfig* hiveConfig) : hiveConfig_(hiveConfig) { VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized"); Aws::Client::ClientConfiguration clientConfig; - clientConfig.endpointOverride = HiveConfig::s3Endpoint(config_); + clientConfig.endpointOverride = hiveConfig_->s3Endpoint(); - if (HiveConfig::s3UseSSL(config_)) { + if (hiveConfig_->s3UseSSL()) { clientConfig.scheme = Aws::Http::Scheme::HTTPS; } else { clientConfig.scheme = Aws::Http::Scheme::HTTP; @@ -523,7 +523,7 @@ class S3FileSystem::Impl { credentialsProvider, clientConfig, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - HiveConfig::s3UseVirtualAddressing(config_)); + hiveConfig_->s3UseVirtualAddressing()); ++fileSystemCount; } @@ -607,13 +607,13 @@ class S3FileSystem::Impl { } private: - const Config* config_; + const HiveConfig* hiveConfig_; std::shared_ptr client_; }; -S3FileSystem::S3FileSystem(std::shared_ptr config) - : FileSystem(config) { - impl_ = std::make_shared(config.get()); +S3FileSystem::S3FileSystem(std::shared_ptr hiveConfig) + : FileSystem(hiveConfig) { + impl_ = std::make_shared(hiveConfig.get()); } std::string S3FileSystem::getLogLevelName() const { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index 4240451ea2caa..1aab0e2fef511 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -17,10 +17,12 @@ #pragma once #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConfig.h" namespace facebook::velox::filesystems { +using namespace facebook::velox::connector::hive; -bool initializeS3(const Config* config); +bool initializeS3(const HiveConfig* hiveConfigconfig); void finalizeS3(); @@ -29,7 +31,7 @@ void finalizeS3(); /// type of file can be constructed based on a filename. class S3FileSystem : public FileSystem { public: - explicit S3FileSystem(std::shared_ptr config); + explicit S3FileSystem(std::shared_ptr hiveConfig); std::string name() const override; diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index f3b348ca1767b..c98db338002ed 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -20,7 +20,8 @@ add_executable( HiveConnectorTest.cpp HiveConnectorSerDeTest.cpp PartitionIdGeneratorTest.cpp - TableHandleTest.cpp) + TableHandleTest.cpp + HiveConfigTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) target_link_libraries( diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp new file mode 100644 index 0000000000000..896dd6a5622e2 --- /dev/null +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -0,0 +1,113 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/FileHandle.h" + +#include "gtest/gtest.h" +#include "velox/common/file/File.h" +#include "velox/connectors/hive/HiveConfig.h" + +using namespace facebook::velox::connector::hive; +using facebook::velox::connector::hive::HiveConfig; + +TEST(HiveConfigTest, defaultConfig) { + HiveConfig* hiveConfig = new HiveConfig({}); + ASSERT_EQ( + hiveConfig->insertExistingPartitionsBehavior(), + InsertExistingPartitionsBehavior::kError); + ASSERT_EQ(hiveConfig->maxPartitionsPerWriters(), 100); + ASSERT_EQ(hiveConfig->immutablePartitions(), false); + ASSERT_EQ(hiveConfig->s3UseVirtualAddressing(), false); + ASSERT_EQ(hiveConfig->s3GetLogLevel(), "FATAL"); + ASSERT_EQ(hiveConfig->s3UseSSL(), true); + ASSERT_EQ(hiveConfig->s3UseInstanceCredentials(), false); + ASSERT_EQ(hiveConfig->s3Endpoint(), "hey"); + ASSERT_EQ(hiveConfig->s3AccessKey(), std::nullopt); + ASSERT_EQ(hiveConfig->s3SecretKey(), std::nullopt); + ASSERT_EQ(hiveConfig->s3IAMRole(), std::nullopt); + ASSERT_EQ(hiveConfig->s3IAMRoleSessionName(), "velox-session"); + ASSERT_EQ(hiveConfig->gcsEndpoint(), ""); + ASSERT_EQ(hiveConfig->gcsScheme(), "https"); + ASSERT_EQ(hiveConfig->gcsCredentials(), ""); + ASSERT_EQ(hiveConfig->isOrcUseColumnNames(), false); + ASSERT_EQ(hiveConfig->isFileColumnNamesReadAsLowerCase(), false); + + ASSERT_EQ(hiveConfig->maxCoalescedBytes(), 128 << 20); + ASSERT_EQ(hiveConfig->maxCoalescedDistanceBytes(), 512 << 10); + ASSERT_EQ(hiveConfig->numCacheFileHandles(), 20'000); + ASSERT_EQ(hiveConfig->isFileHandleCacheEnabled(), true); + ASSERT_EQ(hiveConfig->sortWriterMaxOutputRows(), 1024); + ASSERT_EQ(hiveConfig->sortWriterMaxOutputBytes(), 10UL << 20); + ASSERT_EQ(hiveConfig->getOrcWriterMaxStripeSize(), 64L * 1024L * 1024L); + ASSERT_EQ(hiveConfig->getOrcWriterMaxDictionaryMemory(), 16L * 1024L * 1024L); +} + +TEST(HiveConfigTest, overrideConfig) { + std::unordered_map configFromFile = { + {HiveConfig::kInsertExistingPartitionsBehavior, "OVERWRITE"}, + {HiveConfig::kMaxPartitionsPerWriters, "120"}, + {HiveConfig::kImmutablePartitions, "true"}, + {HiveConfig::kS3PathStyleAccess, "true"}, + {HiveConfig::kS3LogLevel, "Warning"}, + {HiveConfig::kS3SSLEnabled, "false"}, + {HiveConfig::kS3UseInstanceCredentials, "true"}, + {HiveConfig::kS3Endpoint, "hey"}, + {HiveConfig::kS3AwsAccessKey, "hello"}, + {HiveConfig::kS3AwsSecretKey, "hello"}, + {HiveConfig::kS3IamRole, "hello"}, + {HiveConfig::kS3IamRoleSessionName, "velox"}, + {HiveConfig::kGCSEndpoint, "hey"}, + {HiveConfig::kGCSScheme, "http"}, + {HiveConfig::kGCSCredentials, "hey"}, + {HiveConfig::kOrcUseColumnNames, "true"}, + {HiveConfig::kFileColumnNamesReadAsLowerCase, "true"}, + {HiveConfig::kMaxCoalescedBytes, "100"}, + {HiveConfig::kMaxCoalescedDistanceBytes, "100"}, + {HiveConfig::kNumCacheFileHandles, "100"}, + {HiveConfig::kEnableFileHandleCache, "false"}, + {HiveConfig::kOrcWriterMaxStripeSize, "100"}, + {HiveConfig::kOrcWriterMaxDictionaryMemory, "100"}, + {HiveConfig::kSortWriterMaxOutputRows, "100"}, + {HiveConfig::kSortWriterMaxOutputBytes, "100"}}; + HiveConfig* hiveConfig = new HiveConfig(configFromFile); + ASSERT_EQ( + hiveConfig->insertExistingPartitionsBehavior(), + InsertExistingPartitionsBehavior::kOverwrite); + ASSERT_EQ(hiveConfig->maxPartitionsPerWriters(), 120); + ASSERT_EQ(hiveConfig->immutablePartitions(), true); + ASSERT_EQ(hiveConfig->s3UseVirtualAddressing(), true); + ASSERT_EQ(hiveConfig->s3GetLogLevel(), "Warning"); + ASSERT_EQ(hiveConfig->s3UseSSL(), false); + ASSERT_EQ(hiveConfig->s3UseInstanceCredentials(), true); + ASSERT_EQ(hiveConfig->s3Endpoint(), "hey"); + ASSERT_EQ(hiveConfig->s3AccessKey(), std::optional("hello")); + ASSERT_EQ(hiveConfig->s3SecretKey(), std::optional("hello")); + ASSERT_EQ(hiveConfig->s3IAMRole(), std::optional("hello")); + ASSERT_EQ(hiveConfig->s3IAMRoleSessionName(), "velox"); + ASSERT_EQ(hiveConfig->gcsEndpoint(), "hey"); + ASSERT_EQ(hiveConfig->gcsScheme(), "http"); + ASSERT_EQ(hiveConfig->gcsCredentials(), "hey"); + ASSERT_EQ(hiveConfig->isOrcUseColumnNames(), true); + ASSERT_EQ(hiveConfig->isFileColumnNamesReadAsLowerCase(), true); + ASSERT_EQ(hiveConfig->maxCoalescedBytes(), 100); + ASSERT_EQ(hiveConfig->maxCoalescedDistanceBytes(), 100); + ASSERT_EQ(hiveConfig->numCacheFileHandles(), 100); + ASSERT_EQ(hiveConfig->isFileHandleCacheEnabled(), false); + ASSERT_EQ(hiveConfig->sortWriterMaxOutputRows(), 100); + ASSERT_EQ(hiveConfig->sortWriterMaxOutputBytes(), 100); + ASSERT_EQ(hiveConfig->getOrcWriterMaxStripeSize(), 100); + ASSERT_EQ(hiveConfig->getOrcWriterMaxDictionaryMemory(), 100); +} diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 7d97e4469f877..8246eb96a8238 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -65,16 +65,16 @@ groupSubfields(const std::vector& subfields) { TEST_F(HiveConnectorTest, hiveConfig) { ASSERT_EQ( - HiveConfig::insertExistingPartitionsBehaviorString( - HiveConfig::InsertExistingPartitionsBehavior::kError), + insertExistingPartitionsBehaviorString( + InsertExistingPartitionsBehavior::kError), "ERROR"); ASSERT_EQ( - HiveConfig::insertExistingPartitionsBehaviorString( - HiveConfig::InsertExistingPartitionsBehavior::kOverwrite), + insertExistingPartitionsBehaviorString( + InsertExistingPartitionsBehavior::kOverwrite), "OVERWRITE"); ASSERT_EQ( - HiveConfig::insertExistingPartitionsBehaviorString( - static_cast(100)), + insertExistingPartitionsBehaviorString( + static_cast(100)), "UNKNOWN BEHAVIOR 100"); } diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 0461a79e5a039..39195c1604baf 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -93,7 +93,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { connectorQueryCtx_ = std::make_unique( opPool_.get(), connectorPool_.get(), - connectorConfig_.get(), + connectorSession_.get(), nullptr, nullptr, nullptr, @@ -141,7 +141,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { bucketProperty), connectorQueryCtx_.get(), CommitStrategy::kNoCommit, - connectorConfig_); + connectorSession_); } std::vector listFiles(const std::string& dirPath) { @@ -163,9 +163,9 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { fmt::format("SELECT * FROM tmp")); } - void setConnectorConfig( + void setConnectorSession( std::unordered_map connectorConfig) { - connectorConfig_ = + connectorSession_ = std::make_shared(std::move(connectorConfig)); } @@ -182,7 +182,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { std::shared_ptr connectorPool_; RowTypePtr rowType_; std::unique_ptr connectorQueryCtx_; - std::shared_ptr connectorConfig_{std::make_unique()}; + std::shared_ptr connectorSession_{ + std::make_unique()}; std::unique_ptr spillExecutor_; }; @@ -633,7 +634,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), - connectorConfig_.get(), + connectorSession_.get(), spillConfig.get(), nullptr, nullptr, @@ -646,7 +647,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), - connectorConfig_.get(), + connectorSession_.get(), nullptr, nullptr, nullptr, @@ -755,7 +756,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { connectorConfig.emplace( "orc_optimized_writer_max_dictionary_memory", "1GB"); - setConnectorConfig(connectorConfig); + setConnectorSession(connectorConfig); const auto outputDirectory = TempDirectoryPath::create(); std::shared_ptr bucketProperty; @@ -779,7 +780,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), - connectorConfig_.get(), + connectorSession_.get(), spillConfig.get(), nullptr, nullptr, @@ -792,7 +793,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), - connectorConfig_.get(), + connectorSession_.get(), nullptr, nullptr, nullptr, diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index c0472ef09dc46..6c255eaec990d 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -20,7 +20,7 @@ namespace facebook::velox::core { QueryCtx::QueryCtx( folly::Executor* executor, QueryConfig&& queryConfig, - std::unordered_map> connectorConfigs, + std::unordered_map> connectorSessions, cache::AsyncDataCache* cache, std::shared_ptr pool, folly::Executor* spillExecutor, @@ -29,7 +29,7 @@ QueryCtx::QueryCtx( executor_(executor), spillExecutor_(spillExecutor), cache_(cache), - connectorConfigs_(connectorConfigs), + connectorSessions_(connectorSessions), pool_(std::move(pool)), queryConfig_{std::move(queryConfig)} { initPool(queryId); @@ -38,7 +38,7 @@ QueryCtx::QueryCtx( QueryCtx::QueryCtx( folly::Executor* executor, QueryConfig&& queryConfig, - std::unordered_map> connectorConfigs, + std::unordered_map> connectorSessions, cache::AsyncDataCache* cache, std::shared_ptr pool, std::shared_ptr spillExecutor, @@ -47,7 +47,7 @@ QueryCtx::QueryCtx( executor_(executor), spillExecutor_(spillExecutor.get()), cache_(cache), - connectorConfigs_(connectorConfigs), + connectorSessions_(connectorSessions), pool_(std::move(pool)), queryConfig_{std::move(queryConfig)} { initPool(queryId); @@ -56,13 +56,13 @@ QueryCtx::QueryCtx( QueryCtx::QueryCtx( folly::Executor::KeepAlive<> executorKeepalive, std::unordered_map queryConfigValues, - std::unordered_map> connectorConfigs, + std::unordered_map> connectorSessions, cache::AsyncDataCache* cache, std::shared_ptr pool, const std::string& queryId) : queryId_(queryId), cache_(cache), - connectorConfigs_(connectorConfigs), + connectorSessions_(connectorSessions), pool_(std::move(pool)), executorKeepalive_(std::move(executorKeepalive)), queryConfig_{std::move(queryConfigValues)} { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index a789d498d1d24..ce188349e6b13 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -89,9 +89,9 @@ class QueryCtx { return queryConfig_; } - Config* getConnectorConfig(const std::string& connectorId) const { - auto it = connectorConfigs_.find(connectorId); - if (it == connectorConfigs_.end()) { + Config* getConnectorSession(const std::string& connectorId) const { + auto it = connectorSessions_.find(connectorId); + if (it == connectorSessions_.end()) { return getEmptyConfig(); } return it->second.get(); @@ -106,10 +106,10 @@ class QueryCtx { // Overrides the previous connector-specific configuration. Note that this // function is NOT thread-safe and should probably only be used in tests. - void setConnectorConfigOverridesUnsafe( + void setConnectorSessionOverridesUnsafe( const std::string& connectorId, std::unordered_map&& configOverrides) { - connectorConfigs_[connectorId] = + connectorSessions_[connectorId] = std::make_shared(std::move(configOverrides)); } @@ -144,7 +144,7 @@ class QueryCtx { folly::Executor* const spillExecutor_{nullptr}; cache::AsyncDataCache* const cache_; - std::unordered_map> connectorConfigs_; + std::unordered_map> connectorSessions_; std::shared_ptr pool_; folly::Executor::KeepAlive<> executorKeepalive_; QueryConfig queryConfig_; diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 0fa557df5f8d9..a1eec8415d03f 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -360,11 +360,11 @@ Hive Connector - Type - Default Value - Description - * - max_partitions_per_writers + * - hive.max-partitions-per-writers - integer - 100 - Maximum number of (bucketed) partitions per a single table writer instance. - * - insert_existing_partitions_behavior + * - insert-existing-partitions-behavior - string - ERROR - **Allowed values:** ``OVERWRITE``, ``ERROR``. The behavior on insert existing partitions. This property only derives @@ -376,7 +376,7 @@ Hive Connector - false - True if appending data to an existing unpartitioned table is allowed. Currently this configuration does not support appending to existing partitions. - * - file_column_names_read_as_lower_case + * - file-column-names-read-as-lower-case - bool - false - True if reading the source file column names as lower case, and planner should guarantee @@ -389,21 +389,21 @@ Hive Connector - integer - 128MB - Maximum distance in bytes between chunks to be fetched that may be coalesced into a single request. - * - num_cached_file_handles + * - num-cached-file-handles - integer - 20000 - Maximum number of entries in the file handle cache. The value must be non-negative. Zero value indicates infinite cache capacity. - * - file_handle_cache_enabled + * - file-handle-cache-enabled - bool - true - Enables caching of file handles if true. Disables caching if false. File handle cache should be disabled if files are not immutable, i.e. file content may change while file path stays the same. - * - sort_writer_max_output_rows + * - sort-writer-max-output-rows - integer - 1024 - Maximum number of rows for sort writer in one batch of output. This is to limit the memory usage of sort writer. - * - sort_writer_max_output_bytes + * - sort-writer-max-output-bytes - integer - 10MB - Maximum bytes for sort writer in one batch of output. This is to limit the memory usage of sort writer. diff --git a/velox/dwio/common/FileSink.h b/velox/dwio/common/FileSink.h index 58c88ee3bffc4..176a492e03016 100644 --- a/velox/dwio/common/FileSink.h +++ b/velox/dwio/common/FileSink.h @@ -20,6 +20,7 @@ #include "velox/common/file/File.h" #include "velox/common/io/IoStatistics.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/dwio/common/Closeable.h" #include "velox/dwio/common/DataBuffer.h" #include "velox/dwio/common/MetricsLog.h" @@ -30,6 +31,7 @@ class Config; namespace facebook::velox::dwio::common { using namespace facebook::velox::io; +using namespace facebook::velox::connector::hive; /// An abstract interface for providing a file write sink to different storage /// system backends. @@ -40,7 +42,7 @@ class FileSink : public Closeable { bool bufferWrite{true}; /// Connector properties are required to create a FileSink on FileSystems /// such as S3. - const std::shared_ptr& connectorProperties{nullptr}; + const std::shared_ptr& hiveConfig{nullptr}; memory::MemoryPool* pool{nullptr}; MetricsLogPtr metricLogger{MetricsLog::voidLog()}; IoStatistics* stats{nullptr}; @@ -48,7 +50,7 @@ class FileSink : public Closeable { FileSink(std::string name, const Options& options) : name_{std::move(name)}, - connectorProperties_{options.connectorProperties}, + hiveConfig_{options.hiveConfig}, pool_(options.pool), metricLogger_{options.metricLogger}, stats_{options.stats}, @@ -107,7 +109,7 @@ class FileSink : public Closeable { const std::function&)>& callback); const std::string name_; - const std::shared_ptr connectorProperties_; + const std::shared_ptr hiveConfig_; memory::MemoryPool* const pool_; const MetricsLogPtr metricLogger_; IoStatistics* const stats_; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 482c84d5b6e0a..e2d676e0290ea 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -54,7 +54,7 @@ OperatorCtx::createConnectorQueryCtx( return std::make_shared( pool_, connectorPool, - driverCtx_->task->queryCtx()->getConnectorConfig(connectorId), + driverCtx_->task->queryCtx()->getConnectorSession(connectorId), spillConfig, std::make_unique( execCtx()->queryCtx(), execCtx()->pool()), diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 8848330dbecc3..7a2b32a910fc0 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -2030,7 +2030,7 @@ TEST_P(PartitionedTableWriterTest, maxPartitions) { AssertQueryBuilder(plan) .connectorConfig( kHiveConnectorId, - HiveConfig::kMaxPartitionsPerWriters, + HiveConfig::kMaxPartitionsPerWritersSession, folly::to(maxPartitions)) .copyResults(pool()), fmt::format( @@ -2040,7 +2040,7 @@ TEST_P(PartitionedTableWriterTest, maxPartitions) { AssertQueryBuilder(plan) .connectorConfig( kHiveConnectorId, - HiveConfig::kMaxPartitionsPerWriters, + HiveConfig::kMaxPartitionsPerWritersSession, folly::to(maxPartitions)) .copyResults(pool()), "Exceeded open writer limit"); @@ -2189,7 +2189,7 @@ TEST_P(UnpartitionedTableWriterTest, runtimeStatsCheck) { .config(QueryConfig::kTaskWriterCount, std::to_string(1)) .connectorConfig( kHiveConnectorId, - HiveConfig::kOrcWriterMaxStripeSize, + HiveConfig::kOrcWriterMaxStripeSizeSession, testData.maxStripeSize) .assertResults("SELECT count(*) FROM tmp"); auto stats = task->taskStats().pipelineStats.front().operatorStats; @@ -2304,7 +2304,7 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) { AssertQueryBuilder(plan) .connectorConfig( kHiveConnectorId, - HiveConfig::kMaxPartitionsPerWriters, + HiveConfig::kMaxPartitionsPerWritersSession, // Make sure we have a sufficient large writer limit. folly::to(testData.bucketCount * 2)) .copyResults(pool()), @@ -3171,11 +3171,11 @@ DEBUG_ONLY_TEST_P(BucketSortOnlyTableWriterTest, outputBatchRows) { .config(QueryConfig::kTaskWriterCount, std::to_string(1)) .connectorConfig( kHiveConnectorId, - HiveConfig::kSortWriterMaxOutputRows, + HiveConfig::kSortWriterMaxOutputRowsSession, folly::to(testData.maxOutputRows)) .connectorConfig( kHiveConnectorId, - HiveConfig::kSortWriterMaxOutputBytes, + HiveConfig::kSortWriterMaxOutputBytesSession, folly::to(testData.maxOutputBytes)) .assertResults("SELECT count(*) FROM tmp"); auto stats = task->taskStats().pipelineStats.front().operatorStats; diff --git a/velox/exec/tests/utils/AggregationFuzzer.cpp b/velox/exec/tests/utils/AggregationFuzzer.cpp index d615c4ecd503a..56c4347a5f2b5 100644 --- a/velox/exec/tests/utils/AggregationFuzzer.cpp +++ b/velox/exec/tests/utils/AggregationFuzzer.cpp @@ -451,7 +451,7 @@ AggregationFuzzer::AggregationFuzzer( auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, nullptr); + ->newConnector(kHiveConnectorId, {}); connector::registerConnector(hiveConnector); seed(initialSeed); diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index e07059eab9ccd..d9b9b35dd6128 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -240,7 +240,7 @@ AssertQueryBuilder::readCursor() { } if (!connectorConfigs_.empty()) { for (auto& [connectorId, configs] : connectorConfigs_) { - params_.queryCtx->setConnectorConfigOverridesUnsafe( + params_.queryCtx->setConnectorSessionOverridesUnsafe( connectorId, std::move(configs)); } }