Skip to content

Commit

Permalink
Refactor HiveConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Nov 28, 2023
1 parent 4460971 commit 8f9fd41
Show file tree
Hide file tree
Showing 15 changed files with 798 additions and 312 deletions.
368 changes: 248 additions & 120 deletions velox/connectors/hive/HiveConfig.cpp

Large diffs are not rendered by default.

313 changes: 242 additions & 71 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
#pragma once

#include <folly/Conv.h>
#include <glog/logging.h>
#include <iostream>
#include <optional>
#include <string>

Expand All @@ -24,25 +27,109 @@ class Config;

namespace facebook::velox::connector::hive {

enum class InsertExistingPartitionsBehavior {
kError,
kOverwrite,
};

std::string insertExistingPartitionsBehaviorString(
InsertExistingPartitionsBehavior behavior);

/// Hive connector configs.
class HiveConfig {
public:
enum class InsertExistingPartitionsBehavior {
kError,
kOverwrite,
HiveConfig(
const std::unordered_map<std::string, std::string>& connectorConf) {
for (auto config = connectorConf.begin(); config != connectorConf.end();
config++) {
if (configSetters.count(config->first) != 0) {
configSetters.at(config->first)(this, config->second);
} else {
LOG(ERROR) << "Invalid hive config:" << config->first << std::endl;
;
}
}
};

static std::string insertExistingPartitionsBehaviorString(
InsertExistingPartitionsBehavior behavior);
InsertExistingPartitionsBehavior insertExistingPartitionsBehavior() const;

uint32_t maxPartitionsPerWriters(const Config* session) const;

bool immutablePartitions() const;

bool s3UseVirtualAddressing() const;

std::string s3GetLogLevel() const;

bool s3UseSSL() const;

bool s3UseInstanceCredentials() const;

std::string s3Endpoint() const;

std::optional<std::string> s3AccessKey() const;

std::optional<std::string> s3SecretKey() const;

std::optional<std::string> s3IAMRole() const;

std::string s3IAMRoleSessionName() const;

std::string gcsEndpoint() const;

std::string gcsScheme() const;

std::string gcsCredentials() const;

bool isOrcUseColumnNames(const Config* session) const;

bool isFileColumnNamesReadAsLowerCase() const;

int64_t maxCoalescedBytes() const;

int32_t maxCoalescedDistanceBytes() const;

int32_t numCacheFileHandles() const;

bool isFileHandleCacheEnabled() const;

uint32_t sortWriterMaxOutputRows(const Config* session) const;

uint64_t sortWriterMaxOutputBytes(const Config* session) const;

uint64_t getOrcWriterMaxStripeSize(const Config* session) const;

uint64_t getOrcWriterMaxDictionaryMemory(const Config* session) const;

// Sessions
static constexpr const char* kMaxPartitionsPerWritersSession =
"max_partitions_per_writers";

// TODO: remove hive_orc_use_column_names since it doesn't exist in presto,
// right now this is only used for testing.
static constexpr const char* kOrcUseColumnNamesSession =
"hive_orc_use_column_names";

static constexpr const char* kOrcWriterMaxStripeSizeSession =
"orc_optimized_writer_max_stripe_size";

static constexpr const char* kOrcWriterMaxDictionaryMemorySession =
"orc_optimized_writer_max_dictionary_memory";

static constexpr const char* kSortWriterMaxOutputRowsSession =
"sort_writer_max_output_rows";
static constexpr const char* kSortWriterMaxOutputBytesSession =
"sort_writer_max_output_bytes";

// Configs
/// Behavior on insert into existing partitions.
static constexpr const char* kInsertExistingPartitionsBehavior =
"insert_existing_partitions_behavior";
"insert-existing-partitions-behavior";

/// Maximum number of (bucketed) partitions per a single table writer
/// instance.
static constexpr const char* kMaxPartitionsPerWriters =
"max_partitions_per_writers";
"max-partitions-per-writers";

/// Whether new data can be inserted into an unpartition table.
/// Velox currently does not support appending data to existing partitions.
Expand Down Expand Up @@ -105,84 +192,168 @@ class HiveConfig {
"max-coalesced-distance-bytes";

/// Maximum number of entries in the file handle cache.
static constexpr const char* kNumCacheFileHandles = "num_cached_file_handles";
static constexpr const char* kNumCacheFileHandles = "num-cached-file-handles";

/// Enable file handle cache.
static constexpr const char* kEnableFileHandleCache =
"file_handle_cache_enabled";
"file-handle-cache-enabled";

// TODO: Refactor and merge config and session property.
static constexpr const char* kOrcWriterMaxStripeSize =
"orc_optimized_writer_max_stripe_size";
static constexpr const char* kOrcWriterMaxStripeSizeConfig =
"hive.orc.writer.stripe-max-size";

static constexpr const char* kOrcWriterMaxDictionaryMemory =
"orc_optimized_writer_max_dictionary_memory";
static constexpr const char* kOrcWriterMaxDictionaryMemoryConfig =
"hive.orc.writer.dictionary-max-memory";

static constexpr const char* kSortWriterMaxOutputRows =
"sort_writer_max_output_rows";
"sort-writer-max-output-rows";
static constexpr const char* kSortWriterMaxOutputBytes =
"sort_writer_max_output_bytes";

static InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* config);

static uint32_t maxPartitionsPerWriters(const Config* config);

static bool immutablePartitions(const Config* config);

static bool s3UseVirtualAddressing(const Config* config);

static std::string s3GetLogLevel(const Config* config);

static bool s3UseSSL(const Config* config);

static bool s3UseInstanceCredentials(const Config* config);

static std::string s3Endpoint(const Config* config);

static std::optional<std::string> s3AccessKey(const Config* config);
"sort-writer-max-output-bytes";

static std::optional<std::string> s3SecretKey(const Config* config);
private:
static void setInsertExistingPartitionsBehavior(
HiveConfig* hiveConfig,
std::string insertExistingPartitionsBehavior);

static std::optional<std::string> s3IAMRole(const Config* config);

static std::string s3IAMRoleSessionName(const Config* config);

static std::string gcsEndpoint(const Config* config);

static std::string gcsScheme(const Config* config);

static std::string gcsCredentials(const Config* config);

static bool isOrcUseColumnNames(const Config* config);

static bool isFileColumnNamesReadAsLowerCase(const Config* config);

static int64_t maxCoalescedBytes(const Config* config);

static int32_t maxCoalescedDistanceBytes(const Config* config);

static int32_t numCacheFileHandles(const Config* config);

static bool isFileHandleCacheEnabled(const Config* config);

static uint64_t fileWriterFlushThresholdBytes(const Config* config);

static uint32_t sortWriterMaxOutputRows(const Config* config);

static uint64_t sortWriterMaxOutputBytes(const Config* config);

static uint64_t getOrcWriterMaxStripeSize(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig);

static uint64_t getOrcWriterMaxDictionaryMemory(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig);
static void setMaxPartitionsPerWriters(
HiveConfig* hiveConfig,
std::string maxPartitionsPerWriters);

static void setImmutablePartitions(
HiveConfig* hiveConfig,
std::string immutablePartitions);

static void setS3PathStyleAccess(
HiveConfig* hiveConfig,
std::string s3PathStyleAccess);

static void setS3LogLevel(HiveConfig* hiveConfig, std::string s3LogLevel);

static void setS3SSLEnabled(HiveConfig* hiveConfig, std::string s3SSLEnabled);

static void setS3UseInstanceCredentials(
HiveConfig* hiveConfig,
std::string s3UseInstanceCredentials);

static void setS3Endpoint(HiveConfig* hiveConfig, std::string s3Endpoint);

static void setS3AwsAccessKey(
HiveConfig* hiveConfig,
std::string s3AwsAccessKey);

static void setS3AwsSecretKey(
HiveConfig* hiveConfig,
std::string s3AwsSecretKey);

static void setS3IamRole(HiveConfig* hiveConfig, std::string s3IamRole);

static void setS3IamRoleSessionName(
HiveConfig* hiveConfig,
std::string s3IamRoleSessionName);

static void setGCSEndpoint(HiveConfig* hiveConfig, std::string GCSEndpoint);

static void setGCSScheme(HiveConfig* hiveConfig, std::string GCSScheme);

static void setGCSCredentials(
HiveConfig* hiveConfig,
std::string GCSCredentials);

static void setOrcUseColumnNames(
HiveConfig* hiveConfig,
std::string orcUseColumnNames);

static void setFileColumnNamesReadAsLowerCase(
HiveConfig* hiveConfig,
std::string fileColumnNamesReadAsLowerCase);

static void setMaxCoalescedBytes(
HiveConfig* hiveConfig,
std::string maxCoalescedBytes);

static void setMaxCoalescedDistanceBytes(
HiveConfig* hiveConfig,
std::string maxCoalescedDistanceBytes);

static void setNumCacheFileHandles(
HiveConfig* hiveConfig,
std::string numCacheFileHandles);

static void setEnableFileHandleCache(
HiveConfig* hiveConfig,
std::string enableFileHandleCache);

static void setSortWriterMaxOutputRows(
HiveConfig* hiveConfig,
std::string sortWriterMaxOutputRows);

static void setSortWriterMaxOutputBytes(
HiveConfig* hiveConfig,
std::string sortWriterMaxOutputBytes);

static void setOrcWriterMaxStripeSize(
HiveConfig* hiveConfig,
std::string orcWriterMaxStripeSize);

static void setOrcWriterMaxDictionaryMemory(
HiveConfig* hiveConfig,
std::string orcWriterMaxDictionaryMemory);

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior_ =
InsertExistingPartitionsBehavior::kError;
uint32_t maxPartitionsPerWriters_ = 100;
bool immutablePartitions_ = false;
bool s3PathStyleAccess_ = false;
std::string s3LogLevel_ = "FATAL";
bool s3SSLEnabled_ = true;
bool s3UseInstanceCredentials_ = false;
std::string s3Endpoint_ = "";
std::optional<std::string> s3AwsAccessKey_{};
std::optional<std::string> s3AwsSecretKey_{};
std::optional<std::string> s3IamRole_{};
std::string s3IamRoleSessionName_ = "velox-session";
std::string GCSEndpoint_ = "";
std::string GCSScheme_ = "https";
std::string GCSCredentials_ = "";
bool orcUseColumnNames_ = false;
bool fileColumnNamesReadAsLowerCase_ = false;
int64_t maxCoalescedBytes_ = 128 << 20;
int32_t maxCoalescedDistanceBytes_ = 512 << 10;
int32_t numCacheFileHandles_ = 20'000;
bool enableFileHandleCache_ = true;
int32_t sortWriterMaxOutputRows_ = 1024;
uint64_t sortWriterMaxOutputBytes_ = 10UL << 20;
uint64_t orcWriterMaxStripeSize_ = 64L * 1024L * 1024L;
uint64_t orcWriterMaxDictionaryMemory_ = 16L * 1024L * 1024L;

std::unordered_map<std::string, std::function<void(HiveConfig*, std::string)>>
configSetters = {
{kInsertExistingPartitionsBehavior,
setInsertExistingPartitionsBehavior},
{kMaxPartitionsPerWriters, setMaxPartitionsPerWriters},
{kImmutablePartitions, setImmutablePartitions},
{kS3PathStyleAccess, setS3PathStyleAccess},
{kS3LogLevel, setS3LogLevel},
{kS3SSLEnabled, setS3SSLEnabled},
{kS3UseInstanceCredentials, setS3UseInstanceCredentials},
{kS3Endpoint, setS3Endpoint},
{kS3AwsAccessKey, setS3AwsAccessKey},
{kS3AwsSecretKey, setS3AwsSecretKey},
{kS3IamRole, setS3IamRole},
{kS3IamRoleSessionName, setS3IamRoleSessionName},
{kGCSEndpoint, setGCSEndpoint},
{kGCSScheme, setGCSScheme},
{kGCSCredentials, setGCSCredentials},
{kOrcUseColumnNames, setOrcUseColumnNames},
{kFileColumnNamesReadAsLowerCase, setFileColumnNamesReadAsLowerCase},
{kMaxCoalescedBytes, setMaxCoalescedBytes},
{kMaxCoalescedDistanceBytes, setMaxCoalescedDistanceBytes},
{kNumCacheFileHandles, setNumCacheFileHandles},
{kEnableFileHandleCache, setEnableFileHandleCache},
{kOrcWriterMaxStripeSize, setOrcWriterMaxStripeSize},
{kOrcWriterMaxDictionaryMemory, setOrcWriterMaxDictionaryMemory},
{kSortWriterMaxOutputRows, setSortWriterMaxOutputRows},
{kSortWriterMaxOutputBytes, setSortWriterMaxOutputBytes},
};
};

} // namespace facebook::velox::connector::hive
Loading

0 comments on commit 8f9fd41

Please sign in to comment.