Skip to content

Commit

Permalink
[VL] Plumb ABFS config (#8403)
Browse files Browse the repository at this point in the history
[VL] Plumb ABFS config.
  • Loading branch information
majetideepak authored Jan 4, 2025
1 parent c14d1b1 commit d63bb7a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
12 changes: 0 additions & 12 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,6 @@ void VeloxBackend::initConnector() {
connectorConfMap[k] = v;
}

#ifdef ENABLE_ABFS
const auto& confValue = backendConf_->rawConfigs();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
connectorConfMap[k] = v;
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
connectorConfMap[k.substr(accountKeyPrefixLength)] = v;
}
}
#endif

connectorConfMap[velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
backendConf_->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

Expand Down
21 changes: 16 additions & 5 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(

#ifdef ENABLE_S3
using namespace facebook::velox::filesystems;
std::string_view kSparkHadoopPrefix = "spark.hadoop.fs.s3a.";
std::string_view kSparkHadoopBucketPrefix = "spark.hadoop.fs.s3a.bucket.";
std::string_view kSparkHadoopS3Prefix = "spark.hadoop.fs.s3a.";
std::string_view kSparkHadoopS3BucketPrefix = "spark.hadoop.fs.s3a.bucket.";

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
Expand Down Expand Up @@ -87,7 +87,7 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
auto sparkBaseConfigValue = [&](S3Config::Keys key) {
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopPrefix << keyValue.first;
ss << kSparkHadoopS3Prefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
Expand Down Expand Up @@ -131,9 +131,9 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(

// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopBucketPrefix) == 0) {
if (key.find(kSparkHadoopS3BucketPrefix) == 0) {
std::string_view skey = key;
auto remaining = skey.substr(kSparkHadoopBucketPrefix.size());
auto remaining = skey.substr(kSparkHadoopS3BucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
Expand Down Expand Up @@ -189,6 +189,17 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
}
#endif

#ifdef ENABLE_ABFS
std::string_view kSparkHadoopPrefix = "spark.hadoop.";
std::string_view kSparkHadoopAbfsPrefix = "spark.hadoop.fs.azure.";
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopAbfsPrefix) == 0) {
// Remove the SparkHadoopPrefix
hiveConfMap[key.substr(kSparkHadoopPrefix.size())] = value;
}
}
#endif

hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,9 @@ object GlutenConfig {

// Hardware acceleraters backend
val GLUTEN_SHUFFLE_CODEC_BACKEND = "spark.gluten.sql.columnar.shuffle.codecBackend"

// ABFS config
val ABFS_ACCOUNT_KEY = "hadoop.fs.azure.account.key"
val SPARK_ABFS_ACCOUNT_KEY: String = "spark." + ABFS_ACCOUNT_KEY
val ABFS_PREFIX = "fs.azure."

// GCS config
val GCS_PREFIX = "fs.gs."
Expand Down Expand Up @@ -854,7 +854,7 @@ object GlutenConfig {

// handle ABFS config
conf
.filter(_._1.startsWith(SPARK_ABFS_ACCOUNT_KEY))
.filter(_._1.startsWith(HADOOP_PREFIX + ABFS_PREFIX))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

// put in all GCS configs
Expand Down

0 comments on commit d63bb7a

Please sign in to comment.