Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HiveConfig #7725

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ConnectorQueryCtx {
ConnectorQueryCtx(
memory::MemoryPool* operatorPool,
memory::MemoryPool* connectorPool,
const Config* connectorConfig,
const Config* sessionProperties,
const common::SpillConfig* spillConfig,
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator,
cache::AsyncDataCache* cache,
Expand All @@ -247,7 +247,7 @@ class ConnectorQueryCtx {
int driverId)
: operatorPool_(operatorPool),
connectorPool_(connectorPool),
config_(connectorConfig),
sessionProperties_(sessionProperties),
spillConfig_(spillConfig),
expressionEvaluator_(std::move(expressionEvaluator)),
cache_(cache),
Expand All @@ -256,7 +256,7 @@ class ConnectorQueryCtx {
taskId_(taskId),
driverId_(driverId),
planNodeId_(planNodeId) {
VELOX_CHECK_NOT_NULL(connectorConfig);
VELOX_CHECK_NOT_NULL(sessionProperties);
}

/// Returns the associated operator's memory pool which is a leaf kind of
Expand All @@ -272,8 +272,8 @@ class ConnectorQueryCtx {
return connectorPool_;
}

const Config* config() const {
return config_;
const Config* sessionProperties() const {
return sessionProperties_;
}

const common::SpillConfig* spillConfig() const {
Expand Down Expand Up @@ -315,7 +315,7 @@ class ConnectorQueryCtx {
private:
memory::MemoryPool* const operatorPool_;
memory::MemoryPool* const connectorPool_;
const Config* config_;
const Config* const sessionProperties_;
const common::SpillConfig* const spillConfig_;
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
cache::AsyncDataCache* cache_;
Expand All @@ -328,19 +328,16 @@ class ConnectorQueryCtx {

class Connector {
public:
explicit Connector(
const std::string& id,
std::shared_ptr<const Config> properties)
: id_(id), properties_(std::move(properties)) {}
explicit Connector(const std::string& id) : id_(id) {}

virtual ~Connector() = default;

const std::string& connectorId() const {
return id_;
}

const std::shared_ptr<const Config>& connectorProperties() const {
return properties_;
virtual const std::shared_ptr<const Config>& connectorConfig() const {
VELOX_NYI("connectorConfig is not supported yet");
}

// Returns true if this connector would accept a filter dynamically generated
Expand Down Expand Up @@ -391,8 +388,6 @@ class Connector {
static folly::Synchronized<
std::unordered_map<std::string_view, std::weak_ptr<cache::ScanTracker>>>
trackers_;

const std::shared_ptr<const Config> properties_;
};

class ConnectorFactory {
Expand All @@ -410,7 +405,7 @@ class ConnectorFactory {

virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE executor = nullptr) = 0;

private:
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class FuzzerConnector final : public Connector {
public:
FuzzerConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE /*executor*/)
: Connector(id, properties) {}
: Connector(id) {}

std::unique_ptr<DataSource> createDataSource(
const std::shared_ptr<const RowType>& outputType,
Expand Down Expand Up @@ -139,9 +139,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {

std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, properties, executor);
return std::make_shared<FuzzerConnector>(id, config, executor);
}
};

Expand Down
199 changes: 92 additions & 107 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ stringToInsertExistingPartitionsBehavior(const std::string& strValue) {

} // namespace

// static
HiveConfig::InsertExistingPartitionsBehavior
HiveConfig::insertExistingPartitionsBehavior(const Config* config) {
const auto behavior =
config->get<std::string>(kInsertExistingPartitionsBehavior);
return behavior.has_value()
? stringToInsertExistingPartitionsBehavior(behavior.value())
: InsertExistingPartitionsBehavior::kError;
}

// static
std::string HiveConfig::insertExistingPartitionsBehaviorString(
InsertExistingPartitionsBehavior behavior) {
Expand All @@ -62,166 +52,161 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString(
}
}

// static
uint32_t HiveConfig::maxPartitionsPerWriters(const Config* config) {
return config->get<uint32_t>(kMaxPartitionsPerWriters, 100);
HiveConfig::InsertExistingPartitionsBehavior
HiveConfig::insertExistingPartitionsBehavior() const {
const auto behavior =
config_->get<std::string>(kInsertExistingPartitionsBehavior);
return behavior.has_value()
? stringToInsertExistingPartitionsBehavior(behavior.value())
: InsertExistingPartitionsBehavior::kError;
}

// static
bool HiveConfig::immutablePartitions(const Config* config) {
return config->get<bool>(kImmutablePartitions, false);
uint32_t HiveConfig::maxPartitionsPerWriters(const Config* session) const {
if (session->isValueExists(kMaxPartitionsPerWritersSession)) {
return session->get<uint32_t>(kMaxPartitionsPerWritersSession).value();
}
return config_->get<uint32_t>(kMaxPartitionsPerWriters, 100);
}

// static
bool HiveConfig::s3UseVirtualAddressing(const Config* config) {
return !config->get(kS3PathStyleAccess, false);
bool HiveConfig::immutablePartitions() const {
return config_->get<bool>(kImmutablePartitions, false);
}

// static
std::string HiveConfig::s3GetLogLevel(const Config* config) {
return config->get(kS3LogLevel, std::string("FATAL"));
bool HiveConfig::s3UseVirtualAddressing() const {
return !config_->get(kS3PathStyleAccess, false);
}

// static
bool HiveConfig::s3UseSSL(const Config* config) {
return config->get(kS3SSLEnabled, true);
std::string HiveConfig::s3GetLogLevel() const {
return config_->get(kS3LogLevel, std::string("FATAL"));
}

// static
bool HiveConfig::s3UseInstanceCredentials(const Config* config) {
return config->get(kS3UseInstanceCredentials, false);
bool HiveConfig::s3UseSSL() const {
return config_->get(kS3SSLEnabled, true);
}

// static
std::string HiveConfig::s3Endpoint(const Config* config) {
return config->get(kS3Endpoint, std::string(""));
bool HiveConfig::s3UseInstanceCredentials() const {
return config_->get(kS3UseInstanceCredentials, false);
}

// static
std::optional<std::string> HiveConfig::s3AccessKey(const Config* config) {
if (config->isValueExists(kS3AwsAccessKey)) {
return config->get(kS3AwsAccessKey).value();
}
return {};
std::string HiveConfig::s3Endpoint() const {
return config_->get(kS3Endpoint, std::string(""));
}

// static
std::optional<std::string> HiveConfig::s3SecretKey(const Config* config) {
if (config->isValueExists(kS3AwsSecretKey)) {
return config->get(kS3AwsSecretKey).value();
std::optional<std::string> HiveConfig::s3AccessKey() const {
if (config_->isValueExists(kS3AwsAccessKey)) {
return config_->get(kS3AwsAccessKey).value();
}
return {};
}

// static
std::optional<std::string> HiveConfig::s3IAMRole(const Config* config) {
if (config->isValueExists(kS3IamRole)) {
return config->get(kS3IamRole).value();
std::optional<std::string> HiveConfig::s3SecretKey() const {
if (config_->isValueExists(kS3AwsSecretKey)) {
return config_->get(kS3AwsSecretKey).value();
}
return {};
}

// static
std::string HiveConfig::s3IAMRoleSessionName(const Config* config) {
return config->get(kS3IamRoleSessionName, std::string("velox-session"));
}

// static
std::string HiveConfig::gcsEndpoint(const Config* config) {
return config->get<std::string>(kGCSEndpoint, std::string(""));
std::optional<std::string> HiveConfig::s3IAMRole() const {
if (config_->isValueExists(kS3IamRole)) {
return config_->get(kS3IamRole).value();
}
return {};
}

// static
std::string HiveConfig::gcsScheme(const Config* config) {
return config->get<std::string>(kGCSScheme, std::string("https"));
std::string HiveConfig::s3IAMRoleSessionName() const {
return config_->get(kS3IamRoleSessionName, std::string("velox-session"));
}

// static
std::string HiveConfig::gcsCredentials(const Config* config) {
return config->get<std::string>(kGCSCredentials, std::string(""));
std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}

// static.
bool HiveConfig::isOrcUseColumnNames(const Config* config) {
return config->get<bool>(kOrcUseColumnNames, false);
std::string HiveConfig::gcsScheme() const {
return config_->get<std::string>(kGCSScheme, std::string("https"));
}

// static.
bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* config) {
return config->get<bool>(kFileColumnNamesReadAsLowerCase, false);
std::string HiveConfig::gcsCredentials() const {
return config_->get<std::string>(kGCSCredentials, std::string(""));
}

// static.
int64_t HiveConfig::maxCoalescedBytes(const Config* config) {
return config->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
bool HiveConfig::isOrcUseColumnNames(const Config* session) const {
if (session->isValueExists(kOrcUseColumnNamesSession)) {
return session->get<bool>(kOrcUseColumnNamesSession).value();
}
return config_->get<bool>(kOrcUseColumnNames, false);
}

// static.
int32_t HiveConfig::maxCoalescedDistanceBytes(const Config* config) {
return config->get<int32_t>(kMaxCoalescedDistanceBytes, 512 << 10);
bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const {
if (session->isValueExists(kFileColumnNamesReadAsLowerCaseSession)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kewang1024 @majetideepak @xiaoxmeng @mbasmanova

Apart from the issue mentioned by @zhztheplayer here. There is one another issue here. We have currently set kFileColumnNamesReadAsLowerCase to true in Gluten, but it didn't take effect. We must set kFileColumnNamesReadAsLowerCaseSession to true in order for it to work. Based on the code analysis, it seems that if the kFileColumnNamesReadAsLowerCaseSession parameter is not set, it will read the kFileColumnNamesReadAsLowerCase configuration. However, it appears that it is not taking effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf Would you create GitHub issue to explain the problem you are facing? We can discuss it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kFileColumnNamesReadAsLowerCase should be set when a HiveConnector is created, kFileColumnNamesReadAsLowerCaseSession should be set per query basis in the session property.

can you show me where your code is not working?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kewang1024 The related code is here. We set kFileColumnNamesReadAsLowerCase config to true without this PR. And after this PR, we need to set kFileColumnNamesReadAsLowerCaseSession to true and then the unit test can pass in Gluten. It seems the kFileColumnNamesReadAsLowerCase is not longer taking effect in our case.

return session->get<bool>(kFileColumnNamesReadAsLowerCaseSession).value();
}
return config_->get<bool>(kFileColumnNamesReadAsLowerCase, false);
}

// static.
int32_t HiveConfig::numCacheFileHandles(const Config* config) {
return config->get<int32_t>(kNumCacheFileHandles, 20'000);
int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
}

// static.
bool HiveConfig::isFileHandleCacheEnabled(const Config* config) {
return config->get<bool>(kEnableFileHandleCache, true);
int32_t HiveConfig::maxCoalescedDistanceBytes() const {
return config_->get<int32_t>(kMaxCoalescedDistanceBytes, 512 << 10);
}

// static.
uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* config) {
return config->get<int32_t>(kSortWriterMaxOutputRows, 1024);
int32_t HiveConfig::numCacheFileHandles() const {
return config_->get<int32_t>(kNumCacheFileHandles, 20'000);
}

// static.
uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* config) {
return config->get<uint64_t>(kSortWriterMaxOutputBytes, 10UL << 20);
bool HiveConfig::isFileHandleCacheEnabled() const {
return config_->get<bool>(kEnableFileHandleCache, true);
}

uint64_t HiveConfig::getOrcWriterMaxStripeSize(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig) {
if (connectorQueryCtxConfig != nullptr &&
connectorQueryCtxConfig->isValueExists(kOrcWriterMaxStripeSize)) {
uint64_t HiveConfig::getOrcWriterMaxStripeSize(const Config* session) const {
if (session->isValueExists(kOrcWriterMaxStripeSizeSession)) {
return toCapacity(
connectorQueryCtxConfig->get<std::string>(kOrcWriterMaxStripeSize)
.value(),
session->get<std::string>(kOrcWriterMaxStripeSizeSession).value(),
core::CapacityUnit::BYTE);
}
if (connectorPropertiesConfig != nullptr &&
connectorPropertiesConfig->isValueExists(kOrcWriterMaxStripeSizeConfig)) {
if (config_->isValueExists(kOrcWriterMaxStripeSize)) {
return toCapacity(
connectorPropertiesConfig
->get<std::string>(kOrcWriterMaxStripeSizeConfig)
.value(),
config_->get<std::string>(kOrcWriterMaxStripeSize).value(),
core::CapacityUnit::BYTE);
}
return 64L * 1024L * 1024L;
}

uint64_t HiveConfig::getOrcWriterMaxDictionaryMemory(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig) {
if (connectorQueryCtxConfig != nullptr &&
connectorQueryCtxConfig->isValueExists(kOrcWriterMaxDictionaryMemory)) {
const Config* session) const {
if (session->isValueExists(kOrcWriterMaxDictionaryMemorySession)) {
return toCapacity(
connectorQueryCtxConfig->get<std::string>(kOrcWriterMaxDictionaryMemory)
.value(),
session->get<std::string>(kOrcWriterMaxDictionaryMemorySession).value(),
core::CapacityUnit::BYTE);
}
if (connectorPropertiesConfig != nullptr &&
connectorPropertiesConfig->isValueExists(
kOrcWriterMaxDictionaryMemoryConfig)) {
if (config_->isValueExists(kOrcWriterMaxDictionaryMemory)) {
return toCapacity(
connectorPropertiesConfig
->get<std::string>(kOrcWriterMaxDictionaryMemoryConfig)
.value(),
config_->get<std::string>(kOrcWriterMaxDictionaryMemory).value(),
core::CapacityUnit::BYTE);
}
return 16L * 1024L * 1024L;
}

uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* session) const {
if (session->isValueExists(kSortWriterMaxOutputRowsSession)) {
return session->get<uint32_t>(kSortWriterMaxOutputRowsSession).value();
}
return config_->get<int32_t>(kSortWriterMaxOutputRows, 1024);
}

uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const {
if (session->isValueExists(kSortWriterMaxOutputBytesSession)) {
return toCapacity(
session->get<std::string>(kSortWriterMaxOutputBytesSession).value(),
core::CapacityUnit::BYTE);
}
if (config_->isValueExists(kSortWriterMaxOutputBytes)) {
return toCapacity(
config_->get<std::string>(kSortWriterMaxOutputBytes).value(),
core::CapacityUnit::BYTE);
}
return 10UL << 20;
}

} // namespace facebook::velox::connector::hive
Loading