Skip to content

Commit

Permalink
refactor: ABFS implementation (#11419)
Browse files Browse the repository at this point in the history
Summary:
Combine AbfsAccount and AbfsConfig in a separate file.
Clean up API naming and clarify semantics.
Add a new constructor for AbfsWriteFile to specify a client. This is used for testing.

Pull Request resolved: #11419

Reviewed By: Yuhta

Differential Revision: D66015694

Pulled By: kevinwilfong

fbshipit-source-id: 7224aaa1e3cda99c1596546e8050676c635396a5
  • Loading branch information
majetideepak authored and facebook-github-bot committed Nov 19, 2024
1 parent ce67924 commit 7d0b84e
Show file tree
Hide file tree
Showing 23 changed files with 500 additions and 625 deletions.
2 changes: 1 addition & 1 deletion velox/benchmarks/filesystem/ReadBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void ReadBenchmark::initialize() {
filesystems::registerS3FileSystem();
filesystems::registerGcsFileSystem();
filesystems::registerHdfsFileSystem();
filesystems::abfs::registerAbfsFileSystem();
filesystems::registerAbfsFileSystem();
std::shared_ptr<config::ConfigBase> config;
if (!FLAGS_config.empty()) {
config = readConfig(FLAGS_config);
Expand Down
68 changes: 68 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"

#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

namespace facebook::velox::filesystems {

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config) {
std::string_view file;
bool isHttps = true;
if (path.find(kAbfssScheme) == 0) {
file = path.substr(kAbfssScheme.size());
} else if (path.find(kAbfsScheme) == 0) {
file = path.substr(kAbfsScheme.size());
isHttps = false;
} else {
VELOX_FAIL("Invalid ABFS Path {}", path);
}

auto firstAt = file.find_first_of("@");
fileSystem_ = file.substr(0, firstAt);
auto firstSep = file.find_first_of("/");
filePath_ = file.substr(firstSep + 1);

auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1);
auto firstDot = accountNameWithSuffix.find_first_of(".");
auto accountName = accountNameWithSuffix.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5);
auto credKey = fmt::format("fs.azure.account.key.{}", accountNameWithSuffix);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http");
ss << ";AccountName=" << accountName;

if (config.valueExists(credKey)) {
ss << ";AccountKey=" << config.get<std::string>(credKey).value();
} else {
VELOX_USER_FAIL("Config {} not found", credKey);
}

ss << ";EndpointSuffix=" << endpointSuffix;

if (config.valueExists(kAzureBlobEndpoint)) {
ss << ";BlobEndpoint="
<< config.get<std::string>(kAzureBlobEndpoint).value();
}
ss << ";";
connectionString_ = ss.str();
}

} // namespace facebook::velox::filesystems
59 changes: 59 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/hash/Hash.h>
#include <string>

namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::filesystems {

// This is used to specify the Azurite endpoint in testing.
static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"};

class AbfsConfig {
public:
explicit AbfsConfig(std::string_view path, const config::ConfigBase& config);

std::string identity() const {
const auto hash = folly::Hash();
return std::to_string(hash(connectionString_));
}

std::string connectionString() const {
return connectionString_;
}

std::string fileSystem() const {
return fileSystem_;
}

std::string filePath() const {
return filePath_;
}

private:
// Container name is called FileSystem in some Azure API.
std::string fileSystem_;
std::string filePath_;
std::string connectionString_;
};

} // namespace facebook::velox::filesystems
80 changes: 18 additions & 62 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,25 @@
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>

#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"

namespace facebook::velox::filesystems::abfs {
namespace facebook::velox::filesystems {
using namespace Azure::Storage::Blobs;

class AbfsConfig {
public:
AbfsConfig(const config::ConfigBase* config) : config_(config) {}

std::string connectionString(const std::string& path) const {
auto abfsAccount = AbfsAccount(path);
auto key = abfsAccount.credKey();
VELOX_USER_CHECK(
config_->valueExists(key), "Failed to find storage credentials");

return abfsAccount.connectionString(config_->get<std::string>(key).value());
}

private:
const config::ConfigBase* config_;
};

class AbfsReadFile::Impl {
constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M
constexpr static uint64_t kReadConcurrency = 8;

public:
explicit Impl(const std::string& path, const std::string& connectStr) {
auto abfsAccount = AbfsAccount(path);
fileName_ = abfsAccount.filePath();
explicit Impl(std::string_view path, const config::ConfigBase& config) {
auto account = AbfsConfig(path, config);
filePath_ = account.filePath();
fileClient_ =
std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
connectStr, abfsAccount.fileSystem(), fileName_));
account.connectionString(), account.fileSystem(), filePath_));
}

void initialize(const FileOptions& options) {
Expand All @@ -75,7 +57,7 @@ class AbfsReadFile::Impl {
auto properties = fileClient_->GetProperties();
length_ = properties.Value.BlobSize;
} catch (Azure::Storage::StorageException& e) {
throwStorageExceptionWithOperationDetails("GetProperties", fileName_, e);
throwStorageExceptionWithOperationDetails("GetProperties", filePath_, e);
}

VELOX_CHECK_GE(length_, 0);
Expand Down Expand Up @@ -143,7 +125,7 @@ class AbfsReadFile::Impl {
}

std::string getName() const {
return fileName_;
return filePath_;
}

uint64_t getNaturalReadSize() const {
Expand All @@ -165,16 +147,15 @@ class AbfsReadFile::Impl {
reinterpret_cast<uint8_t*>(position), length);
}

std::string fileName_;
std::string filePath_;
std::unique_ptr<BlobClient> fileClient_;

int64_t length_ = -1;
};

AbfsReadFile::AbfsReadFile(
const std::string& path,
const std::string& connectStr) {
impl_ = std::make_shared<Impl>(path, connectStr);
std::string_view path,
const config::ConfigBase& config) {
impl_ = std::make_shared<Impl>(path, config);
}

void AbfsReadFile::initialize(const FileOptions& options) {
Expand Down Expand Up @@ -222,30 +203,9 @@ uint64_t AbfsReadFile::getNaturalReadSize() const {
return impl_->getNaturalReadSize();
}

class AbfsFileSystem::Impl {
public:
explicit Impl(const config::ConfigBase* config) : abfsConfig_(config) {
LOG(INFO) << "Init Azure Blob file system";
}

~Impl() {
LOG(INFO) << "Dispose Azure Blob file system";
}

const std::string connectionString(const std::string& path) const {
// Extract account name
return abfsConfig_.connectionString(path);
}

private:
const AbfsConfig abfsConfig_;
std::shared_ptr<folly::Executor> ioExecutor_;
};

AbfsFileSystem::AbfsFileSystem(
const std::shared_ptr<const config::ConfigBase>& config)
AbfsFileSystem::AbfsFileSystem(std::shared_ptr<const config::ConfigBase> config)
: FileSystem(config) {
impl_ = std::make_shared<Impl>(config.get());
VELOX_CHECK_NOT_NULL(config.get());
}

std::string AbfsFileSystem::name() const {
Expand All @@ -255,18 +215,14 @@ std::string AbfsFileSystem::name() const {
std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
auto abfsfile = std::make_unique<AbfsReadFile>(
std::string(path), impl_->connectionString(std::string(path)));
auto abfsfile = std::make_unique<AbfsReadFile>(path, *config_);
abfsfile->initialize(options);
return abfsfile;
}

std::unique_ptr<WriteFile> AbfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
auto abfsfile = std::make_unique<AbfsWriteFile>(
std::string(path), impl_->connectionString(std::string(path)));
abfsfile->initialize();
return abfsfile;
return std::make_unique<AbfsWriteFile>(path, *config_);
}
} // namespace facebook::velox::filesystems::abfs
} // namespace facebook::velox::filesystems
11 changes: 3 additions & 8 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "velox/common/file/FileSystems.h"

namespace facebook::velox::filesystems::abfs {
namespace facebook::velox::filesystems {

/// Implementation of the ABS (Azure Blob Storage) filesystem and file
/// interface. We provide a registration method for reading and writing files so
Expand All @@ -34,8 +34,7 @@ namespace facebook::velox::filesystems::abfs {
/// https://learn.microsoft.com/en-us/azure/databricks/storage/azure-storage.
class AbfsFileSystem : public FileSystem {
public:
explicit AbfsFileSystem(
const std::shared_ptr<const config::ConfigBase>& config);
explicit AbfsFileSystem(std::shared_ptr<const config::ConfigBase> config);

std::string name() const override;

Expand Down Expand Up @@ -75,11 +74,7 @@ class AbfsFileSystem : public FileSystem {
void rmdir(std::string_view path) override {
VELOX_UNSUPPORTED("rmdir for abfs not implemented");
}

protected:
class Impl;
std::shared_ptr<Impl> impl_;
};

void registerAbfsFileSystem();
} // namespace facebook::velox::filesystems::abfs
} // namespace facebook::velox::filesystems
17 changes: 11 additions & 6 deletions velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
* limitations under the License.
*/

#include <folly/executors/ThreadedExecutor.h>
#include <folly/futures/Future.h>
#pragma once

#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

namespace facebook::velox::filesystems::abfs {
namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::filesystems {
class AbfsReadFile final : public ReadFile {
public:
explicit AbfsReadFile(const std::string& path, const std::string& connectStr);
explicit AbfsReadFile(
std::string_view path,
const config::ConfigBase& config);

void initialize(const FileOptions& options);

Expand Down Expand Up @@ -53,4 +58,4 @@ class AbfsReadFile final : public ReadFile {
class Impl;
std::shared_ptr<Impl> impl_;
};
} // namespace facebook::velox::filesystems::abfs
} // namespace facebook::velox::filesystems
Loading

0 comments on commit 7d0b84e

Please sign in to comment.