Skip to content

Commit

Permalink
[GLUTEN-7964][VL] Support S3 Bucket Config (#8123)
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak authored Jan 1, 2025
1 parent 7503ce8 commit 715fc49
Showing 1 changed file with 91 additions and 79 deletions.
170 changes: 91 additions & 79 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,6 @@ namespace {

const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
// Retry mode for AWS s3
const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
const std::string kVeloxS3RetryModeDefault = "legacy";
// Connection timeout for AWS s3
const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout";
// Using default fs.s3a.connection.timeout value in hadoop
const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace

namespace gluten {
Expand All @@ -62,76 +51,99 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::unordered_map<std::string, std::string> hiveConfMap;

#ifdef ENABLE_S3
std::string awsAccessKey = conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
std::string retryMaxAttempts = conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, kVeloxS3RetryModeDefault);
std::string maxConnections = conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault);

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}
const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
if (envRetryMaxAttempts != nullptr) {
retryMaxAttempts = std::string(envRetryMaxAttempts);
}
const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
if (envRetryMode != nullptr) {
retryMode = std::string(envRetryMode);
}

if (useInstanceCredentials) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kUseInstanceCredentials)] = "true";
} else if (!iamRole.empty()) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kIamRole)] = iamRole;
if (!iamRoleSessionName.empty()) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kIamRoleSessionName)] = iamRoleSessionName;
using namespace facebook::velox::filesystems;
std::string_view kSparkHadoopPrefix = "spark.hadoop.fs.s3a.";
std::string_view kSparkHadoopBucketPrefix = "spark.hadoop.fs.s3a.bucket.";

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";

const std::unordered_map<S3Config::Keys, std::pair<std::string, std::optional<std::string>>> sparkSuffixes = {
{S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
{S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
{S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
{S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", "false")},
{S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access", "false")},
{S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", std::nullopt)},
{S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
{S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum", "15")},
{S3Config::Keys::kConnectTimeout, std::make_pair("connection.timeout", "200s")},
{S3Config::Keys::kUseInstanceCredentials, std::make_pair("instance.credentials", "false")},
{S3Config::Keys::kIamRole, std::make_pair("iam.role", std::nullopt)},
{S3Config::Keys::kIamRoleSessionName, std::make_pair("iam.role.session.name", "gluten-session")},
};

// get Velox S3 config key from Spark Suffix.
auto getVeloxKey = [&](std::string_view suffix) {
for (const auto& [key, value] : sparkSuffixes) {
if (value.first == suffix) {
return std::optional<S3Config::Keys>(key);
}
}
return std::optional<S3Config::Keys>(std::nullopt);
};

auto sparkBaseConfigValue = [&](S3Config::Keys key) {
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopPrefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
}
// Return default value.
return keyValue.second;
};

auto setConfigIfPresent = [&](S3Config::Keys key) {
auto sparkConfig = sparkBaseConfigValue(key);
if (sparkConfig.has_value()) {
hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
}
};

auto setFromEnvOrConfigIfPresent = [&](std::string_view envName, S3Config::Keys key) {
const char* envValue = std::getenv(envName.data());
if (envValue != nullptr) {
hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
} else {
setConfigIfPresent(key);
}
};

setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", S3Config::Keys::kMaxAttempts);
setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", S3Config::Keys::kSecretKey);
setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
setConfigIfPresent(S3Config::Keys::kIamRole);
setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
setConfigIfPresent(S3Config::Keys::kSSLEnabled);
setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
setConfigIfPresent(S3Config::Keys::kMaxConnections);
setConfigIfPresent(S3Config::Keys::kConnectTimeout);

hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);
;

// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopBucketPrefix) == 0) {
std::string_view skey = key;
auto remaining = skey.substr(kSparkHadoopBucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
auto veloxKey = getVeloxKey(suffix);

if (veloxKey.has_value()) {
hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = value;
}
}
} else {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kAccessKey)] = awsAccessKey;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kSecretKey)] = awsSecretKey;
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kEndpoint)] = awsEndpoint;
}
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kSSLEnabled)] = sslEnabled ? "true" : "false";
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kPathStyleAccess)] = pathStyleAccess ? "true" : "false";
hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] = awsSdkLogLevel;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kMaxAttempts)] = retryMaxAttempts;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kRetryMode)] = retryMode;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kMaxConnections)] = maxConnections;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kConnectTimeout)] = connectTimeout;
#endif

#ifdef ENABLE_GCS
Expand Down

0 comments on commit 715fc49

Please sign in to comment.