Skip to content

Commit

Permalink
use common backoff
Browse files Browse the repository at this point in the history
Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano committed Mar 15, 2024
1 parent ecee2bf commit d2491cd
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 114 deletions.
5 changes: 4 additions & 1 deletion api/envoy/extensions/access_loggers/fluentd/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")
licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_xds//udpa/annotations:pkg"],
deps = [
"//envoy/config/core/v3:pkg",
"@com_github_cncf_xds//udpa/annotations:pkg",
],
)
31 changes: 5 additions & 26 deletions api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

package envoy.extensions.access_loggers.fluentd.v3;

import "envoy/config/core/v3/backoff.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/wrappers.proto";
Expand All @@ -24,38 +26,15 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.access_loggers.fluentd]
// [#next-free-field: 8]
message FluentdAccessLogConfig {
enum BackOffStrategy {
// Constructs fully jittered exponential backoff strategy.
// The backoff interval value will be in the range of [base_backoff_interval, max_backoff_interval).
JitteredExponential = 0;

// Constructs fully jittered backoff strategy based on the base_backoff_interval value.
// The backoff interval value will be in the range of [base_backoff_interval, 1.5 * base_backoff_interval).
JitteredLowerBound = 1;

// Constructs fixed backoff strategy based on the base_backoff_interval value.
Fixed = 2;
}

message RetryOptions {
// The number of times the logger will attempt to connect to the upstream during reconnects.
// By default, there is no limit. The logger will attempt to reconnect to the upstream each time
// connecting to the upstream failed or the upstream connection had been closed for any reason.
google.protobuf.UInt32Value max_connect_attempts = 1;

// Sets the backoff strategy. Default value is JitteredExponential.
BackOffStrategy backoff_strategy = 2;

// The base interval to be used for the next back off computation. It should be greater than
// 1 millisecond and less than or equal to max_backoff_interval. The default value is 1 millisecond.
google.protobuf.Duration base_backoff_interval = 3
[(validate.rules).duration = {gte {nanos: 1000000}}];

// Specifies the maximum interval between retries. This parameter is optional,
// but must be greater than or equal to the base_backoff_interval if set. The default
// is 10 times the base_backoff_interval. This field only has effect if JitteredExponential backoff
// strategy is configured.
google.protobuf.Duration max_backoff_interval = 4 [(validate.rules).duration = {gt {}}];
// Sets the backoff strategy. If this value is not set, the default base backoff interval is 500
// milliseconds and the default max backoff interval is 5 seconds (10 times the base interval).
config.core.v3.BackoffStrategy backoff_options = 2;
}

// The upstream cluster to connect to for streaming the Fluentd messages.
Expand Down
9 changes: 3 additions & 6 deletions source/extensions/access_loggers/fluentd/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ namespace Extensions {
namespace AccessLoggers {
namespace Fluentd {

using BackOffStrategyType =
envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig::BackOffStrategy;

// Singleton registration via macro defined in envoy/singleton/manager.h
SINGLETON_MANAGER_REGISTRATION(fluentd_access_logger_cache);

Expand Down Expand Up @@ -48,11 +45,11 @@ FluentdAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config
throw EnvoyException(fmt::format("cluster '{}' was not found", proto_config.cluster()));
}

if (proto_config.has_retry_options()) {
if (proto_config.has_retry_options() && proto_config.retry_options().has_backoff_options()) {
uint64_t base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(
proto_config.retry_options(), base_backoff_interval, DefaultBaseBackoffIntervalMs);
proto_config.retry_options().backoff_options(), base_interval, DefaultBaseBackoffIntervalMs);
uint64_t max_interval_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.retry_options(), max_backoff_interval,
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.retry_options().backoff_options(), max_interval,
base_interval_ms * DefaultMaxBackoffIntervalFactor);

if (max_interval_ms < base_interval_ms) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,33 +170,19 @@ FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigShar
auto client =
cluster->tcpAsyncClient(nullptr, std::make_shared<const Tcp::AsyncTcpClientOptions>(false));

// Default backoff strategy configurations
auto strategy_type = FluentdAccessLogConfig::JitteredExponential;
if (config->has_retry_options()) {
strategy_type = config->retry_options().backoff_strategy();
}

uint64_t base_interval_ms = DefaultBaseBackoffIntervalMs;
uint64_t max_interval_ms = base_interval_ms * DefaultMaxBackoffIntervalFactor;

if (config->has_retry_options()) {
base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options(), base_backoff_interval,
DefaultBaseBackoffIntervalMs);
if (config->has_retry_options() && config->retry_options().has_backoff_options()) {
base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(
config->retry_options().backoff_options(), base_interval, DefaultBaseBackoffIntervalMs);
max_interval_ms =
PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options(), max_backoff_interval,
PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(), max_interval,
base_interval_ms * DefaultMaxBackoffIntervalFactor);
}

BackOffStrategyPtr backoff_strategy;
if (strategy_type == FluentdAccessLogConfig::JitteredExponential) {
backoff_strategy = std::make_unique<JitteredExponentialBackOffStrategy>(
BackOffStrategyPtr backoff_strategy = std::make_unique<JitteredExponentialBackOffStrategy>(
base_interval_ms, max_interval_ms, random);
} else if (strategy_type == FluentdAccessLogConfig::JitteredLowerBound) {
backoff_strategy =
std::make_unique<JitteredLowerBoundBackOffStrategy>(base_interval_ms, random);
} else if (strategy_type == FluentdAccessLogConfig::Fixed) {
backoff_strategy = std::make_unique<FixedBackOffStrategy>(base_interval_ms);
}

const auto logger = std::make_shared<FluentdAccessLoggerImpl>(
*cluster, std::move(client), cache.dispatcher_, *config, std::move(backoff_strategy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using FluentdAccessLogConfig =
envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig;
using FluentdAccessLogConfigSharedPtr = std::shared_ptr<FluentdAccessLogConfig>;

static constexpr uint64_t DefaultBaseBackoffIntervalMs = 1;
static constexpr uint64_t DefaultBaseBackoffIntervalMs = 500;
static constexpr uint64_t DefaultMaxBackoffIntervalFactor = 10;
static constexpr uint64_t DefaultBufferFlushIntervalMs = 1000;
static constexpr uint64_t DefaultMaxBufferSize = 16384;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,62 +403,6 @@ TEST_F(FluentdAccessLoggerCacheImplTest, CreateTwoLoggersDifferentHash) {
EXPECT_NE(logger1, logger2);
}

TEST_F(FluentdAccessLoggerCacheImplTest, FixedBackOffStrategyConfig) {
init();

EXPECT_CALL(cluster_manager_, getThreadLocalCluster(cluster_name_)).WillOnce(Return(&cluster_));
EXPECT_CALL(*async_client1_, connected()).WillOnce(Return(false));
EXPECT_CALL(*async_client1_, connect()).WillOnce(Return(false)).WillOnce(Return(false));
EXPECT_CALL(cluster_, tcpAsyncClient(_, _)).WillOnce(Invoke([&] {
return Tcp::AsyncTcpClientPtr{async_client1_};
}));

envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config;
config.set_cluster(cluster_name_);
config.set_tag("test.tag");
config.mutable_buffer_size_bytes()->set_value(1);
config.mutable_retry_options()->set_backoff_strategy(FluentdAccessLogConfig::Fixed);
config.mutable_retry_options()->mutable_base_backoff_interval()->set_nanos(2000000);

auto logger =
logger_cache_->getOrCreateLogger(std::make_shared<FluentdAccessLogConfig>(config), random_);
ASSERT_TRUE(logger != nullptr);

// Since the strategy is fixed, we expect the timer to be enabled twice with the same duration.
EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(2), _)).Times(2);
logger->log(std::make_unique<Entry>(time_, std::move(data_)));
retry_timer_->invokeCallback();
}

TEST_F(FluentdAccessLoggerCacheImplTest, JitteredLowerBoundBackOffStrategyConfig) {
init();

EXPECT_CALL(cluster_manager_, getThreadLocalCluster(cluster_name_)).WillOnce(Return(&cluster_));
EXPECT_CALL(*async_client1_, connected()).WillOnce(Return(false));
EXPECT_CALL(*async_client1_, connect()).WillOnce(Return(false)).WillOnce(Return(false));
EXPECT_CALL(cluster_, tcpAsyncClient(_, _)).WillOnce(Invoke([&] {
return Tcp::AsyncTcpClientPtr{async_client1_};
}));

envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config;
config.set_cluster(cluster_name_);
config.set_tag("test.tag");
config.mutable_buffer_size_bytes()->set_value(1);
config.mutable_retry_options()->set_backoff_strategy(FluentdAccessLogConfig::JitteredLowerBound);
config.mutable_retry_options()->mutable_base_backoff_interval()->set_nanos(4000000);

auto logger =
logger_cache_->getOrCreateLogger(std::make_shared<FluentdAccessLogConfig>(config), random_);
ASSERT_TRUE(logger != nullptr);

// Since the strategy is JitteredLowerBound, we expect the timer to be enabled twice with a
// duration that is in the range of [4, 6).
EXPECT_CALL(random_, random()).WillRepeatedly(Return(123)); // Will add jitter of 1
EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(5), _)).Times(2);
logger->log(std::make_unique<Entry>(time_, std::move(data_)));
retry_timer_->invokeCallback();
}

TEST_F(FluentdAccessLoggerCacheImplTest, JitteredExponentialBackOffStrategyConfig) {
init();

Expand All @@ -473,9 +417,8 @@ TEST_F(FluentdAccessLoggerCacheImplTest, JitteredExponentialBackOffStrategyConfi
config.set_cluster(cluster_name_);
config.set_tag("test.tag");
config.mutable_buffer_size_bytes()->set_value(1);
config.mutable_retry_options()->set_backoff_strategy(FluentdAccessLogConfig::JitteredExponential);
config.mutable_retry_options()->mutable_base_backoff_interval()->set_nanos(7000000);
config.mutable_retry_options()->mutable_max_backoff_interval()->set_nanos(20000000);
config.mutable_retry_options()->mutable_backoff_options()->mutable_base_interval()->set_nanos(7000000);
config.mutable_retry_options()->mutable_backoff_options()->mutable_max_interval()->set_nanos(20000000);

auto logger =
logger_cache_->getOrCreateLogger(std::make_shared<FluentdAccessLogConfig>(config), random_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ class FluentdAccessLogIntegrationTest : public testing::Test, public BaseIntegra
}

if (base_backoff_interval.has_value()) {
access_log_config.mutable_retry_options()->mutable_base_backoff_interval()->set_nanos(
access_log_config.mutable_retry_options()->mutable_backoff_options()->mutable_base_interval()->set_nanos(
base_backoff_interval.value() * 1000000);
}

if (max_backoff_interval.has_value()) {
access_log_config.mutable_retry_options()->mutable_max_backoff_interval()->set_nanos(
access_log_config.mutable_retry_options()->mutable_backoff_options()->mutable_max_interval()->set_nanos(
max_backoff_interval.value() * 1000000);
}

Expand Down

0 comments on commit d2491cd

Please sign in to comment.