Skip to content

Commit

Permalink
address review comments and Gcs naming
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Oct 27, 2024
1 parent 2c7c5d8 commit 731f492
Show file tree
Hide file tree
Showing 19 changed files with 141 additions and 140 deletions.
4 changes: 2 additions & 2 deletions velox/benchmarks/filesystem/ReadBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"

Expand Down Expand Up @@ -105,7 +105,7 @@ void ReadBenchmark::initialize() {
} else {
filesystems::registerLocalFileSystem();
filesystems::registerS3FileSystem();
filesystems::registerGCSFileSystem();
filesystems::registerGcsFileSystem();
filesystems::registerHdfsFileSystem();
filesystems::abfs::registerAbfsFileSystem();
std::shared_ptr<config::ConfigBase> config;
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,20 @@ std::optional<std::string> HiveConfig::s3RetryMode() const {
}

std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
return config_->get<std::string>(kGcsEndpoint, std::string(""));
}

std::string HiveConfig::gcsCredentialsPath() const {
return config_->get<std::string>(kGCSCredentialsPath, std::string(""));
return config_->get<std::string>(kGcsCredentialsPath, std::string(""));
}

std::optional<int> HiveConfig::gcsMaxRetryCount() const {
return static_cast<std::optional<int>>(config_->get<int>(kGCSMaxRetryCount));
return static_cast<std::optional<int>>(config_->get<int>(kGcsMaxRetryCount));
}

std::optional<std::string> HiveConfig::gcsMaxRetryTime() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kGCSMaxRetryTime));
config_->get<std::string>(kGcsMaxRetryTime));
}

bool HiveConfig::isOrcUseColumnNames(const config::ConfigBase* session) const {
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ class HiveConfig {
static constexpr const char* kS3RetryMode = "hive.s3.retry-mode";

/// The GCS storage endpoint server.
static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint";
static constexpr const char* kGcsEndpoint = "hive.gcs.endpoint";

/// The GCS service account configuration JSON key file.
static constexpr const char* kGCSCredentialsPath =
static constexpr const char* kGcsCredentialsPath =
"hive.gcs.json-key-file-path";

/// The GCS maximum retry counter of transient errors.
static constexpr const char* kGCSMaxRetryCount = "hive.gcs.max-retry-count";
static constexpr const char* kGcsMaxRetryCount = "hive.gcs.max-retry-count";

/// The GCS maximum time allowed to retry transient errors.
static constexpr const char* kGCSMaxRetryTime = "hive.gcs.max-retry-time";
static constexpr const char* kGcsMaxRetryTime = "hive.gcs.max-retry-time";

/// Maps table field names to file field names using names, not indices.
// TODO: remove hive_orc_use_column_names since it doesn't exist in presto,
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

# for generated headers

velox_add_library(velox_gcs RegisterGCSFileSystem.cpp)
velox_add_library(velox_gcs RegisterGcsFileSystem.cpp)

if(VELOX_ENABLE_GCS)
velox_sources(velox_gcs PRIVATE GCSFileSystem.cpp GCSUtil.cpp)
velox_sources(velox_gcs PRIVATE GcsFileSystem.cpp GcsUtil.cpp)
velox_link_libraries(velox_gcs velox_dwio_common Folly::folly
google-cloud-cpp::storage)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

#include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "velox/connectors/hive/storage_adapters/gcs/GcsUtil.h"
#include "velox/core/QueryConfig.h"

#include <fmt/format.h>
Expand All @@ -43,7 +43,7 @@ namespace gc = ::google::cloud;
// upload this buffer with zero copies if possible.
auto constexpr kUploadBufferSize = 256 * 1024;

inline void checkGCSStatus(
inline void checkGcsStatus(
const gc::Status outcome,
const std::string_view& errorMsgPrefix,
const std::string& bucket,
Expand All @@ -54,7 +54,7 @@ inline void checkGCSStatus(
errorMsgPrefix,
gcsURI(bucket, key),
outcome.error_info().domain(),
getErrorStringFromGCSError(outcome.code()),
getErrorStringFromGcsError(outcome.code()),
outcome.message());
if (outcome.code() == gc::StatusCode::kNotFound) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
Expand All @@ -63,12 +63,12 @@ inline void checkGCSStatus(
}
}

class GCSReadFile final : public ReadFile {
class GcsReadFile final : public ReadFile {
public:
GCSReadFile(const std::string& path, std::shared_ptr<gcs::Client> client)
GcsReadFile(const std::string& path, std::shared_ptr<gcs::Client> client)
: client_(std::move(client)) {
// assumption it's a proper path
setBucketAndKeyFromGCSPath(path, bucket_, key_);
setBucketAndKeyFromGcsPath(path, bucket_, key_);
}

// Gets the length of the file.
Expand All @@ -87,7 +87,7 @@ class GCSReadFile final : public ReadFile {
// get metadata and initialize length
auto metadata = client_->GetObjectMetadata(bucket_, key_);
if (!metadata.ok()) {
checkGCSStatus(
checkGcsStatus(
metadata.status(),
"Failed to get metadata for GCS object",
bucket_,
Expand Down Expand Up @@ -138,7 +138,7 @@ class GCSReadFile final : public ReadFile {
}

uint64_t memoryUsage() const override {
return sizeof(GCSReadFile) // this class
return sizeof(GcsReadFile) // this class
+ sizeof(gcs::Client) // pointee
+ kUploadBufferSize; // buffer size
}
Expand All @@ -162,13 +162,13 @@ class GCSReadFile final : public ReadFile {
gcs::ObjectReadStream stream = client_->ReadObject(
bucket_, key_, gcs::ReadRange(offset, offset + length));
if (!stream) {
checkGCSStatus(
checkGcsStatus(
stream.status(), "Failed to get GCS object", bucket_, key_);
}

stream.read(position, length);
if (!stream) {
checkGCSStatus(
checkGcsStatus(
stream.status(), "Failed to get read object", bucket_, key_);
}
bytesRead_ += length;
Expand All @@ -180,16 +180,16 @@ class GCSReadFile final : public ReadFile {
std::atomic<int64_t> length_ = -1;
};

class GCSWriteFile final : public WriteFile {
class GcsWriteFile final : public WriteFile {
public:
explicit GCSWriteFile(
explicit GcsWriteFile(
const std::string& path,
std::shared_ptr<gcs::Client> client)
: client_(client) {
setBucketAndKeyFromGCSPath(path, bucket_, key_);
setBucketAndKeyFromGcsPath(path, bucket_, key_);
}

~GCSWriteFile() {
~GcsWriteFile() {
close();
}

Expand All @@ -204,7 +204,7 @@ class GCSWriteFile final : public WriteFile {
VELOX_CHECK(!object_metadata.ok(), "File already exists");

auto stream = client_->WriteObject(bucket_, key_);
checkGCSStatus(
checkGcsStatus(
stream.last_status(),
"Failed to open GCS object for writing",
bucket_,
Expand Down Expand Up @@ -254,17 +254,17 @@ class GCSWriteFile final : public WriteFile {
namespace filesystems {
using namespace connector::hive;

auto constexpr kGCSInvalidPath = "File {} is not a valid gcs file";
auto constexpr kGcsInvalidPath = "File {} is not a valid gcs file";

class GCSFileSystem::Impl {
class GcsFileSystem::Impl {
public:
Impl(const config::ConfigBase* config)
: hiveConfig_(std::make_shared<HiveConfig>(
std::make_shared<config::ConfigBase>(config->rawConfigsCopy()))) {}

~Impl() = default;

// Use the input Config parameters and initialize the GCSClient.
// Use the input Config parameters and initialize the GcsClient.
void initializeClient() {
constexpr std::string_view kHttpsScheme{"https://"};
auto options = gc::Options{};
Expand Down Expand Up @@ -329,48 +329,48 @@ class GCSFileSystem::Impl {
std::shared_ptr<gcs::Client> client_;
};

GCSFileSystem::GCSFileSystem(std::shared_ptr<const config::ConfigBase> config)
GcsFileSystem::GcsFileSystem(std::shared_ptr<const config::ConfigBase> config)
: FileSystem(config) {
impl_ = std::make_shared<Impl>(config.get());
}

void GCSFileSystem::initializeClient() {
void GcsFileSystem::initializeClient() {
impl_->initializeClient();
}

std::unique_ptr<ReadFile> GCSFileSystem::openFileForRead(
std::unique_ptr<ReadFile> GcsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GCSReadFile>(gcspath, impl_->getClient());
auto gcsfile = std::make_unique<GcsReadFile>(gcspath, impl_->getClient());
gcsfile->initialize(options);
return gcsfile;
}

std::unique_ptr<WriteFile> GCSFileSystem::openFileForWrite(
std::unique_ptr<WriteFile> GcsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GCSWriteFile>(gcspath, impl_->getClient());
auto gcsfile = std::make_unique<GcsWriteFile>(gcspath, impl_->getClient());
gcsfile->initialize();
return gcsfile;
}

void GCSFileSystem::remove(std::string_view path) {
if (!isGCSFile(path)) {
VELOX_FAIL(kGCSInvalidPath, path);
void GcsFileSystem::remove(std::string_view path) {
if (!isGcsFile(path)) {
VELOX_FAIL(kGcsInvalidPath, path);
}

// We assume 'path' is well-formed here.
std::string bucket;
std::string object;
const auto file = gcsPath(path);
setBucketAndKeyFromGCSPath(file, bucket, object);
setBucketAndKeyFromGcsPath(file, bucket, object);

if (!object.empty()) {
auto stat = impl_->getClient()->GetObjectMetadata(bucket, object);
if (!stat.ok()) {
checkGCSStatus(
checkGcsStatus(
stat.status(),
"Failed to get metadata for GCS object",
bucket,
Expand All @@ -379,41 +379,41 @@ void GCSFileSystem::remove(std::string_view path) {
}
auto ret = impl_->getClient()->DeleteObject(bucket, object);
if (!ret.ok()) {
checkGCSStatus(
checkGcsStatus(
ret, "Failed to get metadata for GCS object", bucket, object);
}
}

bool GCSFileSystem::exists(std::string_view path) {
bool GcsFileSystem::exists(std::string_view path) {
std::vector<std::string> result;
if (!isGCSFile(path))
VELOX_FAIL(kGCSInvalidPath, path);
if (!isGcsFile(path))
VELOX_FAIL(kGcsInvalidPath, path);

// We assume 'path' is well-formed here.
const auto file = gcsPath(path);
std::string bucket;
std::string object;
setBucketAndKeyFromGCSPath(file, bucket, object);
setBucketAndKeyFromGcsPath(file, bucket, object);
using ::google::cloud::StatusOr;
StatusOr<gcs::BucketMetadata> metadata =
impl_->getClient()->GetBucketMetadata(bucket);

return metadata.ok();
}

std::vector<std::string> GCSFileSystem::list(std::string_view path) {
std::vector<std::string> GcsFileSystem::list(std::string_view path) {
std::vector<std::string> result;
if (!isGCSFile(path))
VELOX_FAIL(kGCSInvalidPath, path);
if (!isGcsFile(path))
VELOX_FAIL(kGcsInvalidPath, path);

// We assume 'path' is well-formed here.
const auto file = gcsPath(path);
std::string bucket;
std::string object;
setBucketAndKeyFromGCSPath(file, bucket, object);
setBucketAndKeyFromGcsPath(file, bucket, object);
for (auto&& metadata : impl_->getClient()->ListObjects(bucket)) {
if (!metadata.ok()) {
checkGCSStatus(
checkGcsStatus(
metadata.status(),
"Failed to get metadata for GCS object",
bucket,
Expand All @@ -425,19 +425,19 @@ std::vector<std::string> GCSFileSystem::list(std::string_view path) {
return result;
}

std::string GCSFileSystem::name() const {
std::string GcsFileSystem::name() const {
return "GCS";
}

void GCSFileSystem::rename(std::string_view, std::string_view, bool) {
void GcsFileSystem::rename(std::string_view, std::string_view, bool) {
VELOX_UNSUPPORTED("rename for GCS not implemented");
}

void GCSFileSystem::mkdir(std::string_view path) {
void GcsFileSystem::mkdir(std::string_view path) {
VELOX_UNSUPPORTED("mkdir for GCS not implemented");
}

void GCSFileSystem::rmdir(std::string_view path) {
void GcsFileSystem::rmdir(std::string_view path) {
VELOX_UNSUPPORTED("rmdir for GCS not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ namespace facebook::velox::filesystems {
/// We provide a registration method for read and write files so the appropriate
/// type of file can be constructed based on a filename. See the
/// (register|generate)ReadFile and (register|generate)WriteFile functions.
class GCSFileSystem : public FileSystem {
class GcsFileSystem : public FileSystem {
public:
explicit GCSFileSystem(std::shared_ptr<const config::ConfigBase> config);
explicit GcsFileSystem(std::shared_ptr<const config::ConfigBase> config);

/// Initialize the google::cloud::storage::Client from the input Config
/// parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "velox/connectors/hive/storage_adapters/gcs/GcsUtil.h"

namespace facebook::velox {

std::string getErrorStringFromGCSError(const google::cloud::StatusCode& code) {
std::string getErrorStringFromGcsError(const google::cloud::StatusCode& code) {
using ::google::cloud::StatusCode;

switch (code) {
Expand Down
Loading

0 comments on commit 731f492

Please sign in to comment.