Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RetryStrategy for S3 file system #9736

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ std::optional<uint32_t> HiveConfig::s3MaxConnections() const {
config_->get<uint32_t>(kS3MaxConnections));
}

std::optional<int32_t> HiveConfig::s3MaxAttempts() const {
return static_cast<std::optional<std::int32_t>>(
config_->get<int32_t>(kS3MaxAttempts));
}

std::optional<std::string> HiveConfig::s3RetryMode() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3RetryMode));
}

std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}
Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class HiveConfig {
/// Maximum concurrent TCP connections for a single http client.
static constexpr const char* kS3MaxConnections = "hive.s3.max-connections";

/// Maximum retry attempts for a single http client.
static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts";

/// Retry mode for a single http client.
static constexpr const char* kS3RetryMode = "hive.s3.retry-mode";

/// The GCS storage endpoint server.
static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint";

Expand Down Expand Up @@ -246,6 +252,10 @@ class HiveConfig {

std::optional<uint32_t> s3MaxConnections() const;

std::optional<int32_t> s3MaxAttempts() const;

std::optional<std::string> s3RetryMode() const;

std::string gcsEndpoint() const;

std::string gcsScheme() const;
Expand Down
60 changes: 59 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/AdaptiveRetryStrategy.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
Expand Down Expand Up @@ -70,7 +72,6 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
}

// TODO: Implement retry on failure.
class S3ReadFile final : public ReadFile {
public:
S3ReadFile(const std::string& path, Aws::S3::S3Client* client)
Expand Down Expand Up @@ -562,6 +563,11 @@ class S3FileSystem::Impl {
clientConfig.maxConnections = hiveConfig_->s3MaxConnections().value();
}

auto retryStrategy = getRetryStrategy();
if (retryStrategy.has_value()) {
clientConfig.retryStrategy = retryStrategy.value();
}

auto credentialsProvider = getCredentialsProvider();

client_ = std::make_shared<Aws::S3::S3Client>(
Expand Down Expand Up @@ -640,6 +646,58 @@ class S3FileSystem::Impl {
return getDefaultCredentialsProvider();
}

// Return a client RetryStrategy based on the config.
std::optional<std::shared_ptr<Aws::Client::RetryStrategy>> getRetryStrategy()
const {
auto retryMode = hiveConfig_->s3RetryMode();
auto maxAttempts = hiveConfig_->s3MaxAttempts();
if (retryMode.has_value()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user doesn't config retryMode but config maxAttempts, does maxAttempts take effect? It should takes effect since we have default retry mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. In velox, maxAttempts need work together with retryMode. If retryMode isn't configured, S3 client will be created w/o RetryStrategy. But for Gluten, we've set default value for retryMode. I updated the doc in this PR.

if (retryMode.value() == "standard") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::StandardRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::StandardRetryStrategy>();
}
} else if (retryMode.value() == "adaptive") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value 3.
return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
}
} else if (retryMode.value() == "legacy") {
if (maxAttempts.has_value()) {
VELOX_USER_CHECK_GE(
maxAttempts.value(),
0,
"Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.",
maxAttempts.value());
return std::make_shared<Aws::Client::DefaultRetryStrategy>(
maxAttempts.value());
} else {
// Otherwise, use default value maxRetries = 10, scaleFactor = 25
return std::make_shared<Aws::Client::DefaultRetryStrategy>();
}
} else {
VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the check of invalid value of retry mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}
return std::nullopt;
}

// Make it clear that the S3FileSystem instance owns the S3Client.
// Once the S3FileSystem is destroyed, the S3Client fails to work
// due to the Aws::ShutdownAPI invocation in the destructor.
Expand Down
44 changes: 25 additions & 19 deletions velox/connectors/hive/storage_adapters/s3fs/S3Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,31 @@ inline std::string getRequestID(
} // namespace

/// Only Amazon (amz) and Alibaba (oss) request IDs are supported.
#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \
{ \
if (!outcome.IsSuccess()) { \
auto error = outcome.GetError(); \
auto errMsg = fmt::format( \
"{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'", \
errorMsgPrefix, \
getErrorStringFromS3Error(error), \
s3URI(bucket, key), \
static_cast<int>(error.GetErrorType()), \
error.GetResponseCode(), \
getS3BackendService(error.GetResponseHeaders()), \
error.GetMessage(), \
getRequestID(error.GetResponseHeaders())); \
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \
VELOX_FILE_NOT_FOUND_ERROR(errMsg); \
} \
VELOX_FAIL(errMsg) \
} \
#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \
{ \
if (!outcome.IsSuccess()) { \
auto error = outcome.GetError(); \
auto errMsg = fmt::format( \
"{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'.", \
errorMsgPrefix, \
getErrorStringFromS3Error(error), \
s3URI(bucket, key), \
static_cast<int>(error.GetErrorType()), \
error.GetResponseCode(), \
getS3BackendService(error.GetResponseHeaders()), \
error.GetMessage(), \
getRequestID(error.GetResponseHeaders())); \
if (IsRetryableHttpResponseCode(error.GetResponseCode())) { \
auto retryHint = fmt::format( \
" Request failed after retrying {} times. Try increasing the value of 'hive.s3.max-attempts'.", \
outcome.GetRetryCount()); \
errMsg.append(retryHint); \
} \
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \
VELOX_FILE_NOT_FOUND_ERROR(errMsg); \
} \
VELOX_FAIL(errMsg) \
} \
}

bool isHostExcludedFromProxy(
Expand Down
13 changes: 12 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,18 @@ Each query can override the config by setting corresponding query session proper
- integer
-
- Maximum concurrent TCP connections for a single http client.

* - hive.s3.max-attempts
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, aws sdk uses 3, right? Let's mention this here

- integer
-
- Maximum attempts for connections to a single http client, work together with retry-mode. By default, it's 3 for standard/adaptive mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this document describe, default 4 for legacy mode

A default value of 4 for maximum retry attempts,

https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Get this number from code, maybe more reliable?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

their cpp_sdk and document have some difference. Let's follow sdk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe different AWS version has different default value

and 10 for legacy mode.
* - hive.s3.retry-mode
- string
-
- **Allowed values:** "standard", "adaptive", "legacy". By default it's empty, S3 client will be created with RetryStrategy.
Legacy mode only enables throttled retry for transient errors.
Standard mode is built on top of legacy mode and has throttled retry enabled for throttling errors apart from transient errors.
Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate.
``Google Cloud Storage Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. list-table::
Expand Down
Loading