diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index cc3243226625..1609a8adb62f 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -120,6 +120,16 @@ std::optional HiveConfig::s3MaxConnections() const { config_->get(kS3MaxConnections)); } +std::optional HiveConfig::s3MaxAttempts() const { + return static_cast>( + config_->get(kS3MaxAttempts)); +} + +std::optional HiveConfig::s3RetryMode() const { + return static_cast>( + config_->get(kS3RetryMode)); +} + std::string HiveConfig::gcsEndpoint() const { return config_->get(kGCSEndpoint, std::string("")); } diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 51f387909e95..6dd0b7669122 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -98,6 +98,12 @@ class HiveConfig { /// Maximum concurrent TCP connections for a single http client. static constexpr const char* kS3MaxConnections = "hive.s3.max-connections"; + /// Maximum retry attempts for a single http client. + static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts"; + + /// Retry mode for a single http client. + static constexpr const char* kS3RetryMode = "hive.s3.retry-mode"; + /// The GCS storage endpoint server. static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint"; @@ -246,6 +252,10 @@ class HiveConfig { std::optional s3MaxConnections() const; + std::optional s3MaxAttempts() const; + + std::optional s3RetryMode() const; + std::string gcsEndpoint() const; std::string gcsScheme() const; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 5f3ed3891fc6..c8c50480dcc8 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -30,6 +30,8 @@ #include #include +#include +#include #include #include #include @@ -70,7 +72,6 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) { return [=]() { return Aws::New("", data, nbytes); }; } -// TODO: Implement retry on failure. class S3ReadFile final : public ReadFile { public: S3ReadFile(const std::string& path, Aws::S3::S3Client* client) @@ -562,6 +563,11 @@ class S3FileSystem::Impl { clientConfig.maxConnections = hiveConfig_->s3MaxConnections().value(); } + auto retryStrategy = getRetryStrategy(); + if (retryStrategy.has_value()) { + clientConfig.retryStrategy = retryStrategy.value(); + } + auto credentialsProvider = getCredentialsProvider(); client_ = std::make_shared( @@ -640,6 +646,58 @@ class S3FileSystem::Impl { return getDefaultCredentialsProvider(); } + // Return a client RetryStrategy based on the config. + std::optional> getRetryStrategy() + const { + auto retryMode = hiveConfig_->s3RetryMode(); + auto maxAttempts = hiveConfig_->s3MaxAttempts(); + if (retryMode.has_value()) { + if (retryMode.value() == "standard") { + if (maxAttempts.has_value()) { + VELOX_USER_CHECK_GE( + maxAttempts.value(), + 0, + "Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.", + maxAttempts.value()); + return std::make_shared( + maxAttempts.value()); + } else { + // Otherwise, use default value 3. + return std::make_shared(); + } + } else if (retryMode.value() == "adaptive") { + if (maxAttempts.has_value()) { + VELOX_USER_CHECK_GE( + maxAttempts.value(), + 0, + "Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.", + maxAttempts.value()); + return std::make_shared( + maxAttempts.value()); + } else { + // Otherwise, use default value 3. + return std::make_shared(); + } + } else if (retryMode.value() == "legacy") { + if (maxAttempts.has_value()) { + VELOX_USER_CHECK_GE( + maxAttempts.value(), + 0, + "Invalid configuration: specified 'hive.s3.max-attempts' value {} is < 0.", + maxAttempts.value()); + return std::make_shared( + maxAttempts.value()); + } else { + // Otherwise, use default value maxRetries = 10, scaleFactor = 25 + return std::make_shared(); + } + } else { + VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value()); + } + } + return std::nullopt; + } + // Make it clear that the S3FileSystem instance owns the S3Client. // Once the S3FileSystem is destroyed, the S3Client fails to work // due to the Aws::ShutdownAPI invocation in the destructor. diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Util.h b/velox/connectors/hive/storage_adapters/s3fs/S3Util.h index 3d24ba5c510d..9632dbf47b87 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Util.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Util.h @@ -157,25 +157,31 @@ inline std::string getRequestID( } // namespace /// Only Amazon (amz) and Alibaba (oss) request IDs are supported. -#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \ - { \ - if (!outcome.IsSuccess()) { \ - auto error = outcome.GetError(); \ - auto errMsg = fmt::format( \ - "{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'", \ - errorMsgPrefix, \ - getErrorStringFromS3Error(error), \ - s3URI(bucket, key), \ - static_cast(error.GetErrorType()), \ - error.GetResponseCode(), \ - getS3BackendService(error.GetResponseHeaders()), \ - error.GetMessage(), \ - getRequestID(error.GetResponseHeaders())); \ - if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \ - VELOX_FILE_NOT_FOUND_ERROR(errMsg); \ - } \ - VELOX_FAIL(errMsg) \ - } \ +#define VELOX_CHECK_AWS_OUTCOME(outcome, errorMsgPrefix, bucket, key) \ + { \ + if (!outcome.IsSuccess()) { \ + auto error = outcome.GetError(); \ + auto errMsg = fmt::format( \ + "{} due to: '{}'. Path:'{}', SDK Error Type:{}, HTTP Status Code:{}, S3 Service:'{}', Message:'{}', RequestID:'{}'.", \ + errorMsgPrefix, \ + getErrorStringFromS3Error(error), \ + s3URI(bucket, key), \ + static_cast(error.GetErrorType()), \ + error.GetResponseCode(), \ + getS3BackendService(error.GetResponseHeaders()), \ + error.GetMessage(), \ + getRequestID(error.GetResponseHeaders())); \ + if (IsRetryableHttpResponseCode(error.GetResponseCode())) { \ + auto retryHint = fmt::format( \ + " Request failed after retrying {} times. Try increasing the value of 'hive.s3.max-attempts'.", \ + outcome.GetRetryCount()); \ + errMsg.append(retryHint); \ + } \ + if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { \ + VELOX_FILE_NOT_FOUND_ERROR(errMsg); \ + } \ + VELOX_FAIL(errMsg) \ + } \ } bool isHostExcludedFromProxy( diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 4b8826c1a3e1..284816db37ca 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -559,7 +559,18 @@ Each query can override the config by setting corresponding query session proper - integer - - Maximum concurrent TCP connections for a single http client. - + * - hive.s3.max-attempts + - integer + - + - Maximum attempts for connections to a single http client, work together with retry-mode. By default, it's 3 for standard/adaptive mode + and 10 for legacy mode. + * - hive.s3.retry-mode + - string + - + - **Allowed values:** "standard", "adaptive", "legacy". By default it's empty, S3 client will be created with RetryStrategy. + Legacy mode only enables throttled retry for transient errors. + Standard mode is built on top of legacy mode and has throttled retry enabled for throttling errors apart from transient errors. + Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. ``Google Cloud Storage Configuration`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. list-table::