Skip to content

Commit

Permalink
abfs connector write support
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangyang Gao committed Oct 24, 2023
1 parent a840fba commit 568289d
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ if(VELOX_ENABLE_ABFS)
if(AZURESDK_ROOT_DIR)
list(APPEND CMAKE_PREFIX_PATH ${AZURESDK_ROOT_DIR})
endif()
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
add_definitions(-DVELOX_ENABLE_ABFS)
endif()

Expand Down
10 changes: 9 additions & 1 deletion scripts/setup-adapters.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function install_gcs-sdk-cpp {
}

function install_azure-storage-sdk-cpp {
github_checkout azure/azure-sdk-for-cpp azure-storage-blobs_12.8.0
github_checkout azure/azure-sdk-for-cpp azure-storage-files-datalake_12.8.0

cd sdk/core/azure-core
if ! grep -q "baseline" vcpkg.json; then
Expand All @@ -88,6 +88,14 @@ function install_azure-storage-sdk-cpp {
sed -i 's/"version-semver"/"builtin-baseline": "dafef74af53669ef1cc9015f55e0ce809ead62aa","version-semver"/' vcpkg.json
fi
cmake_install -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF

cd -
# install azure-storage-files-datalake
cd sdk/storage/azure-storage-files-datalake
if ! grep -q "baseline" vcpkg.json; then
sed -i 's/"version-semver"/"builtin-baseline": "dafef74af53669ef1cc9015f55e0ce809ead62aa","version-semver"/' vcpkg.json
fi
cmake_install -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF
}

function install_libhdfs3 {
Expand Down
161 changes: 161 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include "velox/core/Config.h"

#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/files/datalake.hpp>
#include <fmt/format.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>

namespace facebook::velox::filesystems::abfs {
using namespace Azure::Storage::Blobs;

class AbfsConfig {
public:
AbfsConfig(const Config* config) : config_(config) {}
Expand Down Expand Up @@ -213,6 +216,155 @@ uint64_t AbfsReadFile::getNaturalReadSize() const {
return impl_->getNaturalReadSize();
}

class BlobStorageFileClient final : public IBlobStorageFileClient {
public:
BlobStorageFileClient(const DataLakeFileClient client)
: client_(std::make_unique<DataLakeFileClient>(client)) {}

void Create() override {
client_->Create();
}

PathProperties GetProperties() override {
return client_->GetProperties().Value;
}

void Append(const uint8_t* buffer, size_t size, uint64_t offset) override {
auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size);
client_->Append(bodyStream, offset);
}

void Flush(uint64_t position) override {
client_->Flush(position);
}

void Close() override {
// do nothing.
}

private:
std::unique_ptr<DataLakeFileClient> client_;
};

class AbfsWriteFile::Impl {
public:
explicit Impl(const std::string& path, const std::string& connectStr)
: path_(path), connectStr_(connectStr) {
// Make it a no-op if invoked twice.
if (position_ != -1) {
return;
}
position_ = 0;
}

void initialize() {
if (!blobStorageFileClient_) {
auto abfsAccount = AbfsAccount(path_);
auto fileClient = DataLakeFileClient::CreateFromConnectionString(
connectStr_, abfsAccount.fileSystem(), abfsAccount.filePath());
blobStorageFileClient_ = std::make_unique<BlobStorageFileClient>(
BlobStorageFileClient(fileClient));
}

VELOX_CHECK(!exist(), "File already exists");
blobStorageFileClient_->Create();
}

/// mainly for test purpose.
void setFileClient(
std::shared_ptr<IBlobStorageFileClient> blobStorageManager) {
blobStorageFileClient_ = std::move(blobStorageManager);
}

bool exist() {
try {
blobStorageFileClient_->GetProperties();
return true;
} catch (Azure::Storage::StorageException& e) {
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return false;
} else {
throwStorageExceptionWithOperationDetails("GetProperties", path_, e);
}
}
}

void close() {
if (!closed_) {
flush();
blobStorageFileClient_->Close();
closed_ = true;
}
}

void flush() {
if (!closed_) {
blobStorageFileClient_->Flush(position_);
}
}

void append(std::string_view data) {
VELOX_CHECK(!closed_, "File is not open");
if (data.size() == 0) {
return;
}
append(data.data(), data.size());
}

uint64_t size() const {
auto properties = blobStorageFileClient_->GetProperties();
return properties.FileSize;
}

void append(const char* buffer, size_t size) {
auto offset = position_;
position_ += size;
blobStorageFileClient_->Append(
reinterpret_cast<const uint8_t*>(buffer), size, offset);
}

private:
const std::string path_;
const std::string connectStr_;
std::string fileSystem_;
std::string fileName_;
std::shared_ptr<IBlobStorageFileClient> blobStorageFileClient_;

uint64_t position_ = -1;
std::atomic<bool> closed_{false};
};

AbfsWriteFile::AbfsWriteFile(
const std::string& path,
const std::string& connectStr) {
impl_ = std::make_shared<Impl>(path, connectStr);
}

void AbfsWriteFile::initialize() {
impl_->initialize();
}

void AbfsWriteFile::close() {
impl_->close();
}

void AbfsWriteFile::flush() {
impl_->flush();
}

void AbfsWriteFile::append(std::string_view data) {
impl_->append(data);
}

uint64_t AbfsWriteFile::size() const {
return impl_->size();
}

void AbfsWriteFile::setFileClient(
std::shared_ptr<IBlobStorageFileClient> fileClient) {
impl_->setFileClient(std::move(fileClient));
}

class AbfsFileSystem::Impl {
public:
explicit Impl(const Config* config) : abfsConfig_(config) {
Expand Down Expand Up @@ -250,4 +402,13 @@ std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
abfsfile->initialize();
return abfsfile;
}

std::unique_ptr<WriteFile> AbfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
auto abfsfile = std::make_unique<AbfsWriteFile>(
std::string(path), impl_->connectionString(std::string(path)));
abfsfile->initialize();
return abfsfile;
}
} // namespace facebook::velox::filesystems::abfs
4 changes: 1 addition & 3 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ class AbfsFileSystem : public FileSystem {

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
const FileOptions& options = {}) override {
VELOX_UNSUPPORTED("write for abfs not implemented");
}
const FileOptions& options = {}) override;

void rename(
std::string_view path,
Expand Down
90 changes: 90 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <folly/executors/IOThreadPoolExecutor.h>
#include <iostream>
#include "AbfsUtil.h"
#include "folly/io/Cursor.h"
#include "velox/common/file/File.h"

namespace Azure::Storage::Files::DataLake::Models {
class PathProperties;
}

namespace facebook::velox::filesystems::abfs {
using namespace Azure::Storage::Files::DataLake;
using namespace Azure::Storage::Files::DataLake::Models;

/*
* We are using the DFS (Data Lake Storage) endpoint for Azure Blob File write
* operations because the DFS endpoint is designed to be compatible with file
* operation semantics, such as `Append` to a file and file `Flush` operations.
* The legacy Blob endpoint can only be used for blob level append and flush
* operations. When using the Blob endpoint, we would need to manually manage
* the creation, appending, and committing of file-related blocks.
*
* However, the Azurite Simulator does not yet support the DFS endpoint.
* (For more information, see https://github.com/Azure/Azurite/issues/553 and
* https://github.com/Azure/Azurite/issues/409).
* You can find a comparison between DFS and Blob endpoints here:
* https://github.com/Azure/Azurite/wiki/ADLS-Gen2-Implementation-Guidance
*
* To facilitate unit testing of file write scenarios, we define the
* IBlobStorageFileClient here, which can be mocked during testing.
*/
class IBlobStorageFileClient {
public:
virtual void Create() = 0;
virtual PathProperties GetProperties() = 0;
virtual void Append(const uint8_t* buffer, size_t size, uint64_t offset) = 0;
virtual void Flush(uint64_t position) = 0;
virtual void Close() = 0;
};

/// Implementation of abfs write file. Nothing written to the file should be
/// read back until it is closed.
class AbfsWriteFile : public WriteFile {
public:
constexpr static uint64_t kNaturalWriteSize = 8 << 20; // 8M
/// The constructor.
/// @param path The file path to write.
/// @param connectStr the connection string used to auth the storage account.
AbfsWriteFile(const std::string& path, const std::string& connectStr);

/// check any issue reading file.
void initialize();

/// Get the file size.
uint64_t size() const override;

/// Flush the data.
void flush() override;

/// Write the data by append mode.
void append(std::string_view data) override;

/// Close the file.
void close() override;

/// mainly for test purpose.
void setFileClient(std::shared_ptr<IBlobStorageFileClient> fileClient);

protected:
class Impl;
std::shared_ptr<Impl> impl_;
};
} // namespace facebook::velox::filesystems::abfs
1 change: 1 addition & 0 deletions velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ if(VELOX_ENABLE_ABFS)
velox_hive_connector
velox_dwio_common_exception
Azure::azure-storage-blobs
Azure::azure-storage-files-datalake
Folly::folly
glog::glog
fmt::fmt)
Expand Down
Loading

0 comments on commit 568289d

Please sign in to comment.