Skip to content

Commit

Permalink
Add RetryStrategy for S3 file system
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed May 7, 2024
1 parent 0c4dad1 commit 78a948c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
2 changes: 1 addition & 1 deletion scripts/setup-adapters.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MACHINE=$(uname -m)

function install_aws_deps {
local AWS_REPO_NAME="aws/aws-sdk-cpp"
local AWS_SDK_VERSION="1.11.169"
local AWS_SDK_VERSION="1.11.321"

github_checkout $AWS_REPO_NAME $AWS_SDK_VERSION --depth 1 --recurse-submodules
cmake_install -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DBUILD_SHARED_LIBS:BOOL=OFF -DMINIMIZE_SIZE:BOOL=ON -DENABLE_TESTING:BOOL=OFF -DBUILD_ONLY:STRING="s3;identity-management"
Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ std::optional<uint32_t> HiveConfig::s3MaxConnections() const {
config_->get<uint32_t>(kS3MaxConnections));
}

std::optional<uint32_t> HiveConfig::s3MaxAttempts() const {
return static_cast<std::optional<std::uint32_t>>(
config_->get<uint32_t>(kS3MaxAttempts));
}

std::optional<std::string> HiveConfig::s3RetryMode() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3RetryMode));
}

std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}
Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class HiveConfig {
/// Maximum concurrent TCP connections for a single http client.
static constexpr const char* kS3MaxConnections = "hive.s3.max-connections";

/// Maximum retry attempts for a single http client.
static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts";

/// Retry mode for a single http client.
static constexpr const char* kS3RetryMode = "hive.s3.retry-mode";

/// The GCS storage endpoint server.
static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint";

Expand Down Expand Up @@ -224,6 +230,10 @@ class HiveConfig {

std::optional<uint32_t> s3MaxConnections() const;

std::optional<uint32_t> s3MaxAttempts() const;

std::optional<std::string> s3RetryMode() const;

std::string gcsEndpoint() const;

std::string gcsScheme() const;
Expand Down
39 changes: 38 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/AdaptiveRetryStrategy.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
Expand Down Expand Up @@ -70,7 +72,6 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
}

// TODO: Implement retry on failure.
class S3ReadFile final : public ReadFile {
public:
S3ReadFile(const std::string& path, Aws::S3::S3Client* client)
Expand Down Expand Up @@ -562,6 +563,9 @@ class S3FileSystem::Impl {
clientConfig.maxConnections = hiveConfig_->s3MaxConnections().value();
}

auto retryStrategy = getRetryStrategy();
clientConfig.retryStrategy = retryStrategy;

auto credentialsProvider = getCredentialsProvider();

client_ = std::make_shared<Aws::S3::S3Client>(
Expand Down Expand Up @@ -640,6 +644,39 @@ class S3FileSystem::Impl {
return getDefaultCredentialsProvider();
}

// Return a client RetryStrategy based on the config.
std::shared_ptr<Aws::Client::RetryStrategy> getRetryStrategy() const {
auto retryMode = hiveConfig_->s3RetryMode();
auto maxAttempts = hiveConfig_->s3MaxAttempts();
if (retryMode.has_value()) {
if (retryMode.value() == "standard") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK(
(maxAttempts.value() > 0),
"Invalid configuration: specify 'max-attempts' > 0.");
return std::make_shared<Aws::Client::StandardRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::StandardRetryStrategy>();
}
} else if (retryMode.value() == "adaptive") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK(
(maxAttempts.value() > 0),
"Invalid configuration: specify 'max-attempts' > 0.");
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
}
}
}
// By default use DefaultRetryStrategy.
return std::make_shared<Aws::Client::DefaultRetryStrategy>();
}

// Make it clear that the S3FileSystem instance owns the S3Client.
// Once the S3FileSystem is destroyed, the S3Client fails to work
// due to the Aws::ShutdownAPI invocation in the destructor.
Expand Down
9 changes: 8 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,14 @@ Each query can override the config by setting corresponding query session proper
- integer
-
- Maximum concurrent TCP connections for a single http client.

* - hive.s3.max-attempts
- integer
-
- Maximum attempts for connections to a single http client.
* - hive.s3.retry-mode
- string
-
- 'standard' or 'adaptive', use DefaultRetryStrategy if it's empty.
``Google Cloud Storage Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. list-table::
Expand Down

0 comments on commit 78a948c

Please sign in to comment.