Skip to content

Commit

Permalink
Add RetryStrategy for S3 file system (#9736)
Browse files Browse the repository at this point in the history
Summary:
This PR add `RetryStrategy` support for s3 file system, thus it will retry when connection fails. It also upgrade aws sdk to `1.11.321` which  supports `AdaptiveRetryStrategy` which user may choose to use.

For `RetryStrategy`, 2 configs are added:

`hive.s3.max-attempts`: Maximum attempts for connections to a single http client.
`hive.s3.retry-mode`: 'standard', 'adaptive' or `legacy`, client will be created w/o retrystrategy if it's empty.

Pull Request resolved: #9736

Reviewed By: pedroerp

Differential Revision: D58368804

Pulled By: kgpai

fbshipit-source-id: 7aaea7f0cdc9cd6149fda28bbe317cba3357a9d4
  • Loading branch information
yma11 authored and facebook-github-bot committed Jun 15, 2024
1 parent c98e7de commit 8c3630b
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 21 deletions.
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ std::optional<uint32_t> HiveConfig::s3MaxConnections() const {
config_->get<uint32_t>(kS3MaxConnections));
}

std::optional<int32_t> HiveConfig::s3MaxAttempts() const {
return static_cast<std::optional<std::int32_t>>(
config_->get<int32_t>(kS3MaxAttempts));
}

std::optional<std::string> HiveConfig::s3RetryMode() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3RetryMode));
}

std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}
Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class HiveConfig {
/// Maximum concurrent TCP connections for a single http client.
static constexpr const char* kS3MaxConnections = "hive.s3.max-connections";

/// Maximum retry attempts for a single http client.
static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts";

/// Retry mode for a single http client.
static constexpr const char* kS3RetryMode = "hive.s3.retry-mode";

/// The GCS storage endpoint server.
static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint";

Expand Down Expand Up @@ -252,6 +258,10 @@ class HiveConfig {

std::optional<uint32_t> s3MaxConnections() const;

std::optional<int32_t> s3MaxAttempts() const;

std::optional<std::string> s3RetryMode() const;

std::string gcsEndpoint() const;

std::string gcsScheme() const;
Expand Down
60 changes: 59 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/AdaptiveRetryStrategy.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
Expand Down Expand Up @@ -70,7 +72,6 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
}

// TODO: Implement retry on failure.
class S3ReadFile final : public ReadFile {
public:
S3ReadFile(const std::string& path, Aws::S3::S3Client* client)
Expand Down Expand Up @@ -562,6 +563,11 @@ class S3FileSystem::Impl {
clientConfig.maxConnections = hiveConfig_->s3MaxConnections().value();
}

auto retryStrategy = getRetryStrategy();
if (retryStrategy.has_value()) {
clientConfig.retryStrategy = retryStrategy.value();
}

auto credentialsProvider = getCredentialsProvider();

client_ = std::make_shared<Aws::S3::S3Client>(
Expand Down Expand Up @@ -640,6 +646,58 @@ class S3FileSystem::Impl {
return getDefaultCredentialsProvider();
}

// Return a client RetryStrategy based on the config.
std::optional<std::shared_ptr<Aws::Client::RetryStrategy>> getRetryStrategy()
const {
auto retryMode = hiveConfig_->s3RetryMode();
auto maxAttempts = hiveConfig_->s3MaxAttempts();
if (retryMode.has_value()) {
if (retryMode.value() == "standard") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::StandardRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::StandardRetryStrategy>();
}
} else if (retryMode.value() == "adaptive") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
}
} else if (retryMode.value() == "legacy") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::DefaultRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value maxRetries = 10, scaleFactor = 25
return std::make_shared<Aws::Client::DefaultRetryStrategy>();
}
} else {
VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value());
}
}
return std::nullopt;
}

// Make it clear that the S3FileSystem instance owns the S3Client.
// Once the S3FileSystem is destroyed, the S3Client fails to work
// due to the Aws::ShutdownAPI invocation in the destructor.
Expand Down
44 changes: 25 additions & 19 deletions velox/connectors/hive/storage_adapters/s3fs/S3Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,31 @@ inline std::string getRequestID(
} // namespace

/// Only Amazon (amz) and Alibaba (oss) request IDs are supported.
#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \
{ \
if (!outcome.IsSuccess()) { \
auto error = outcome.GetError(); \
auto errMsg = fmt::format( \
"{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'", \
errorMsgPrefix, \
getErrorStringFromS3Error(error), \
s3URI(bucket, key), \
static_cast<int>(error.GetErrorType()), \
error.GetResponseCode(), \
getS3BackendService(error.GetResponseHeaders()), \
error.GetMessage(), \
getRequestID(error.GetResponseHeaders())); \
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \
VELOX_FILE_NOT_FOUND_ERROR(errMsg); \
} \
VELOX_FAIL(errMsg) \
} \
#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \
{ \
if (!outcome.IsSuccess()) { \
auto error = outcome.GetError(); \
auto errMsg = fmt::format( \
"{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'.", \
errorMsgPrefix, \
getErrorStringFromS3Error(error), \
s3URI(bucket, key), \
static_cast<int>(error.GetErrorType()), \
error.GetResponseCode(), \
getS3BackendService(error.GetResponseHeaders()), \
error.GetMessage(), \
getRequestID(error.GetResponseHeaders())); \
if (IsRetryableHttpResponseCode(error.GetResponseCode())) { \
auto retryHint = fmt::format( \
" Request failed after retrying {} times. Try increasing the value of 'hive.s3.max-attempts'.", \
outcome.GetRetryCount()); \
errMsg.append(retryHint); \
} \
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \
VELOX_FILE_NOT_FOUND_ERROR(errMsg); \
} \
VELOX_FAIL(errMsg) \
} \
}

bool isHostExcludedFromProxy(
Expand Down
13 changes: 12 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,18 @@ Each query can override the config by setting corresponding query session proper
- integer
-
- Maximum concurrent TCP connections for a single http client.

* - hive.s3.max-attempts
- integer
-
- Maximum attempts for connections to a single http client, work together with retry-mode. By default, it's 3 for standard/adaptive mode
and 10 for legacy mode.
* - hive.s3.retry-mode
- string
-
- **Allowed values:** "standard", "adaptive", "legacy". By default it's empty, S3 client will be created with RetryStrategy.
Legacy mode only enables throttled retry for transient errors.
Standard mode is built on top of legacy mode and has throttled retry enabled for throttling errors apart from transient errors.
Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate.
``Google Cloud Storage Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. list-table::
Expand Down

0 comments on commit 8c3630b

Please sign in to comment.