From 79d5a6d96a9121da655593c19cb17185b03b047d Mon Sep 17 00:00:00 2001 From: ohadvano <49730675+ohadvano@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:40:16 +0200 Subject: [PATCH] fluentd_access_logger: add retry and backoff options (#32682) Signed-off-by: ohadvano --- .../access_loggers/fluentd/v3/BUILD | 5 +- .../access_loggers/fluentd/v3/fluentd.proto | 20 +- changelogs/current.yaml | 5 + .../observability/access_log/stats.rst | 1 + source/common/common/BUILD | 2 + source/common/common/backoff_strategy.h | 21 ++ .../access_loggers/fluentd/config.cc | 11 + .../fluentd/fluentd_access_log_impl.cc | 103 +++++-- .../fluentd/fluentd_access_log_impl.h | 28 +- test/common/common/backoff_strategy_test.cc | 25 ++ .../fluentd/fluentd_access_log_impl_test.cc | 278 ++++++++++++++++-- .../fluentd_access_log_integration_test.cc | 81 ++++- test/mocks/common.h | 11 + 13 files changed, 529 insertions(+), 62 deletions(-) diff --git a/api/envoy/extensions/access_loggers/fluentd/v3/BUILD b/api/envoy/extensions/access_loggers/fluentd/v3/BUILD index 29ebf0741406..09a37ad16b83 100644 --- a/api/envoy/extensions/access_loggers/fluentd/v3/BUILD +++ b/api/envoy/extensions/access_loggers/fluentd/v3/BUILD @@ -5,5 +5,8 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") licenses(["notice"]) # Apache 2 api_proto_package( - deps = ["@com_github_cncf_xds//udpa/annotations:pkg"], + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_xds//udpa/annotations:pkg", + ], ) diff --git a/api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto b/api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto index e6b2adfdc9c0..ce68c79e9d91 100644 --- a/api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto +++ b/api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package envoy.extensions.access_loggers.fluentd.v3; +import "envoy/config/core/v3/backoff.proto"; + import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; import "google/protobuf/wrappers.proto"; @@ -22,8 +24,19 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // the Fluentd Forward Protocol as described in: `Fluentd Forward Protocol Specification // `_. // [#extension: envoy.access_loggers.fluentd] -// [#next-free-field: 7] +// [#next-free-field: 8] message FluentdAccessLogConfig { + message RetryOptions { + // The number of times the logger will attempt to connect to the upstream during reconnects. + // By default, there is no limit. The logger will attempt to reconnect to the upstream each time + // connecting to the upstream failed or the upstream connection had been closed for any reason. + google.protobuf.UInt32Value max_connect_attempts = 1; + + // Sets the backoff strategy. If this value is not set, the default base backoff interval is 500 + // milliseconds and the default max backoff interval is 5 seconds (10 times the base interval). + config.core.v3.BackoffStrategy backoff_options = 2; + } + // The upstream cluster to connect to for streaming the Fluentd messages. string cluster = 1 [(validate.rules).string = {min_len: 1}]; @@ -67,4 +80,9 @@ message FluentdAccessLogConfig { // "message": "My error message" // } google.protobuf.Struct record = 6 [(validate.rules).message = {required: true}]; + + // Optional retry, in case upstream connection has failed. If this field is not set, the default values will be applied, + // as specified in the :ref:`RetryOptions ` + // configuration. + RetryOptions retry_options = 7; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index aa29fe080592..e95e090f0ed4 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -313,6 +313,11 @@ new_features: added a :ref:`configuration option ` to add ``x-envoy-local-overloaded`` header when Overload Manager is triggered. +- area: access_loggers + change: | + Added :ref:`retry options + ` to Fluentd Access Logger to + support upstream reconnect options, backoff intervals. - area: tracing change: | Added support to configure a Dynatrace sampler for the OpenTelemetry tracer. diff --git a/docs/root/configuration/observability/access_log/stats.rst b/docs/root/configuration/observability/access_log/stats.rst index 0a61df4e4518..5bc23c23ef67 100644 --- a/docs/root/configuration/observability/access_log/stats.rst +++ b/docs/root/configuration/observability/access_log/stats.rst @@ -46,4 +46,5 @@ The Fluentd access log has statistics rooted at the *access_logs.fluentd.(proto_config), context.serverFactoryContext().threadLocal(), + context.serverFactoryContext().api().randomGenerator(), getAccessLoggerCacheSingleton(context.serverFactoryContext())); } diff --git a/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.cc b/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.cc index ce1a96694d90..a6f312d66d96 100644 --- a/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.cc +++ b/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.cc @@ -12,17 +12,26 @@ namespace Fluentd { using MessagePackBuffer = msgpack::sbuffer; using MessagePackPacker = msgpack::packer; -FluentdAccessLoggerImpl::FluentdAccessLoggerImpl(Tcp::AsyncTcpClientPtr client, +FluentdAccessLoggerImpl::FluentdAccessLoggerImpl(Upstream::ThreadLocalCluster& cluster, + Tcp::AsyncTcpClientPtr client, Event::Dispatcher& dispatcher, const FluentdAccessLogConfig& config, + BackOffStrategyPtr backoff_strategy, Stats::Scope& parent_scope) : tag_(config.tag()), id_(dispatcher.name()), + max_connect_attempts_( + config.has_retry_options() && config.retry_options().has_max_connect_attempts() + ? absl::optional(config.retry_options().max_connect_attempts().value()) + : absl::nullopt), stats_scope_(parent_scope.createScope(config.stat_prefix())), fluentd_stats_( {ACCESS_LOG_FLUENTD_STATS(POOL_COUNTER(*stats_scope_), POOL_GAUGE(*stats_scope_))}), - client_(std::move(client)), - buffer_flush_interval_msec_(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)), - max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)), + cluster_(cluster), backoff_strategy_(std::move(backoff_strategy)), client_(std::move(client)), + buffer_flush_interval_msec_( + PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, DefaultBufferFlushIntervalMs)), + max_buffer_size_bytes_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, DefaultMaxBufferSize)), + retry_timer_(dispatcher.createTimer([this]() -> void { onBackoffCallback(); })), flush_timer_(dispatcher.createTimer([this]() { flush(); flush_timer_->enableTimer(buffer_flush_interval_msec_); @@ -35,27 +44,21 @@ void FluentdAccessLoggerImpl::onEvent(Network::ConnectionEvent event) { connecting_ = false; if (event == Network::ConnectionEvent::Connected) { + backoff_strategy_->reset(); + retry_timer_->disableTimer(); flush(); } else if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { ENVOY_LOG(debug, "upstream connection was closed"); - // TODO(ohadvano): add an option to reconnect to the upstream, if configured - fluentd_stats_.connections_closed_.inc(); - disconnected_ = true; - clearBuffer(); - - ASSERT(flush_timer_ != nullptr); - flush_timer_->disableTimer(); + maybeReconnect(); } } void FluentdAccessLoggerImpl::log(EntryPtr&& entry) { - if (disconnected_) { + if (disconnected_ || approximate_message_size_bytes_ >= max_buffer_size_bytes_) { fluentd_stats_.entries_lost_.inc(); // We will lose the data deliberately so the buffer doesn't grow infinitely. - // Since the client is disconnected, there's nothing much we can do with the data anyway. - // TODO(ohadvano): add an option to reconnect to the upstream, if configured return; } @@ -63,6 +66,8 @@ void FluentdAccessLoggerImpl::log(EntryPtr&& entry) { entries_.push_back(std::move(entry)); fluentd_stats_.entries_buffered_.inc(); if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) { + // If we exceeded the buffer limit, immediately flush the logs instead of waiting for + // the next flush interval, to allow new logs to be buffered. flush(); } } @@ -76,8 +81,7 @@ void FluentdAccessLoggerImpl::flush() { } if (!client_->connected()) { - connecting_ = true; - client_->connect(); + connect(); return; } @@ -102,6 +106,42 @@ void FluentdAccessLoggerImpl::flush() { clearBuffer(); } +void FluentdAccessLoggerImpl::connect() { + connect_attempts_++; + if (!client_->connect()) { + ENVOY_LOG(debug, "no healthy upstream"); + maybeReconnect(); + return; + } + + connecting_ = true; +} + +void FluentdAccessLoggerImpl::maybeReconnect() { + if (max_connect_attempts_.has_value() && connect_attempts_ >= max_connect_attempts_) { + ENVOY_LOG(debug, "max connection attempts reached"); + cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc(); + setDisconnected(); + return; + } + + uint64_t next_backoff_ms = backoff_strategy_->nextBackOffMs(); + retry_timer_->enableTimer(std::chrono::milliseconds(next_backoff_ms)); + ENVOY_LOG(debug, "reconnect attempt scheduled for {} ms", next_backoff_ms); +} + +void FluentdAccessLoggerImpl::onBackoffCallback() { + fluentd_stats_.reconnect_attempts_.inc(); + this->connect(); +} + +void FluentdAccessLoggerImpl::setDisconnected() { + disconnected_ = true; + clearBuffer(); + ASSERT(flush_timer_ != nullptr); + flush_timer_->disableTimer(); +} + void FluentdAccessLoggerImpl::clearBuffer() { entries_.clear(); approximate_message_size_bytes_ = 0; @@ -117,7 +157,8 @@ FluentdAccessLoggerCacheImpl::FluentdAccessLoggerCacheImpl( } FluentdAccessLoggerSharedPtr -FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config) { +FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config, + Random::RandomGenerator& random) { auto& cache = tls_slot_->getTyped(); const auto cache_key = MessageUtil::hash(*config); const auto it = cache.access_loggers_.find(cache_key); @@ -125,25 +166,41 @@ FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigShar return it->second.lock(); } + auto* cluster = cluster_manager_.getThreadLocalCluster(config->cluster()); auto client = - cluster_manager_.getThreadLocalCluster(config->cluster()) - ->tcpAsyncClient(nullptr, std::make_shared(false)); + cluster->tcpAsyncClient(nullptr, std::make_shared(false)); + + uint64_t base_interval_ms = DefaultBaseBackoffIntervalMs; + uint64_t max_interval_ms = base_interval_ms * DefaultMaxBackoffIntervalFactor; + + if (config->has_retry_options() && config->retry_options().has_backoff_options()) { + base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(), + base_interval, DefaultBaseBackoffIntervalMs); + max_interval_ms = + PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(), max_interval, + base_interval_ms * DefaultMaxBackoffIntervalFactor); + } + + BackOffStrategyPtr backoff_strategy = std::make_unique( + base_interval_ms, max_interval_ms, random); const auto logger = std::make_shared( - std::move(client), cache.dispatcher_, *config, *stats_scope_); + *cluster, std::move(client), cache.dispatcher_, *config, std::move(backoff_strategy), + *stats_scope_); cache.access_loggers_.emplace(cache_key, logger); return logger; } FluentdAccessLog::FluentdAccessLog(AccessLog::FilterPtr&& filter, FluentdFormatterPtr&& formatter, const FluentdAccessLogConfigSharedPtr config, - ThreadLocal::SlotAllocator& tls, + ThreadLocal::SlotAllocator& tls, Random::RandomGenerator& random, FluentdAccessLoggerCacheSharedPtr access_logger_cache) : ImplBase(std::move(filter)), formatter_(std::move(formatter)), tls_slot_(tls.allocateSlot()), config_(config), access_logger_cache_(access_logger_cache) { tls_slot_->set( - [config = config_, access_logger_cache = access_logger_cache_](Event::Dispatcher&) { - return std::make_shared(access_logger_cache->getOrCreateLogger(config)); + [config = config_, &random, access_logger_cache = access_logger_cache_](Event::Dispatcher&) { + return std::make_shared( + access_logger_cache->getOrCreateLogger(config, random)); }); } diff --git a/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.h b/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.h index 11a03530d32d..95e84632177f 100644 --- a/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.h +++ b/source/extensions/access_loggers/fluentd/fluentd_access_log_impl.h @@ -18,6 +18,11 @@ using FluentdAccessLogConfig = envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig; using FluentdAccessLogConfigSharedPtr = std::shared_ptr; +static constexpr uint64_t DefaultBaseBackoffIntervalMs = 500; +static constexpr uint64_t DefaultMaxBackoffIntervalFactor = 10; +static constexpr uint64_t DefaultBufferFlushIntervalMs = 1000; +static constexpr uint64_t DefaultMaxBufferSize = 16384; + // Entry represents a single Fluentd message, msgpack format based, as specified in: // https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#entry class Entry { @@ -49,6 +54,7 @@ using FluentdAccessLoggerSharedPtr = std::shared_ptr; COUNTER(entries_lost) \ COUNTER(entries_buffered) \ COUNTER(events_sent) \ + COUNTER(reconnect_attempts) \ COUNTER(connections_closed) struct AccessLogFluentdStats { @@ -59,8 +65,9 @@ class FluentdAccessLoggerImpl : public Tcp::AsyncTcpClientCallbacks, public FluentdAccessLogger, public Logger::Loggable { public: - FluentdAccessLoggerImpl(Tcp::AsyncTcpClientPtr client, Event::Dispatcher& dispatcher, - const FluentdAccessLogConfig& config, Stats::Scope& parent_scope); + FluentdAccessLoggerImpl(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client, + Event::Dispatcher& dispatcher, const FluentdAccessLogConfig& config, + BackOffStrategyPtr backoff_strategy, Stats::Scope& parent_scope); // Tcp::AsyncTcpClientCallbacks void onEvent(Network::ConnectionEvent event) override; @@ -73,19 +80,28 @@ class FluentdAccessLoggerImpl : public Tcp::AsyncTcpClientCallbacks, private: void flush(); + void connect(); + void maybeReconnect(); + void onBackoffCallback(); + void setDisconnected(); void clearBuffer(); bool disconnected_ = false; bool connecting_ = false; std::string tag_; std::string id_; + uint32_t connect_attempts_{0}; + absl::optional max_connect_attempts_{}; const Stats::ScopeSharedPtr stats_scope_; AccessLogFluentdStats fluentd_stats_; std::vector entries_; uint64_t approximate_message_size_bytes_ = 0; + Upstream::ThreadLocalCluster& cluster_; + const BackOffStrategyPtr backoff_strategy_; const Tcp::AsyncTcpClientPtr client_; const std::chrono::milliseconds buffer_flush_interval_msec_; const uint64_t max_buffer_size_bytes_; + const Event::TimerPtr retry_timer_; const Event::TimerPtr flush_timer_; }; @@ -98,7 +114,8 @@ class FluentdAccessLoggerCache { * @return FluentdAccessLoggerSharedPtr ready for logging requests. */ virtual FluentdAccessLoggerSharedPtr - getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config) PURE; + getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config, + Random::RandomGenerator& random) PURE; }; using FluentdAccessLoggerCacheSharedPtr = std::shared_ptr; @@ -108,8 +125,8 @@ class FluentdAccessLoggerCacheImpl : public Singleton::Instance, public FluentdA FluentdAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager, Stats::Scope& parent_scope, ThreadLocal::SlotAllocator& tls); - FluentdAccessLoggerSharedPtr - getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config) override; + FluentdAccessLoggerSharedPtr getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config, + Random::RandomGenerator& random) override; private: /** @@ -135,6 +152,7 @@ class FluentdAccessLog : public Common::ImplBase { public: FluentdAccessLog(AccessLog::FilterPtr&& filter, FluentdFormatterPtr&& formatter, const FluentdAccessLogConfigSharedPtr config, ThreadLocal::SlotAllocator& tls, + Random::RandomGenerator& random, FluentdAccessLoggerCacheSharedPtr access_logger_cache); private: diff --git a/test/common/common/backoff_strategy_test.cc b/test/common/common/backoff_strategy_test.cc index a5111809bf44..6dcb6a04aeaf 100644 --- a/test/common/common/backoff_strategy_test.cc +++ b/test/common/common/backoff_strategy_test.cc @@ -114,4 +114,29 @@ TEST(FixedBackOffStrategyTest, FixedBackOffBasicReset) { EXPECT_EQ(20, fixed_back_off.nextBackOffMs()); } +TEST(BackOffStrategyUtilsTest, InvalidConfig) { + { + // Valid config. + envoy::config::core::v3::BackoffStrategy backoff_strategy; + backoff_strategy.mutable_base_interval()->set_seconds(2); + backoff_strategy.mutable_max_interval()->set_seconds(3); + EXPECT_TRUE(BackOffStrategyUtils::validateBackOffStrategyConfig(backoff_strategy, 1, 10).ok()); + } + + { + // Max interval is lower than base interval. + envoy::config::core::v3::BackoffStrategy backoff_strategy; + backoff_strategy.mutable_base_interval()->set_seconds(3); + backoff_strategy.mutable_max_interval()->set_seconds(2); + EXPECT_TRUE(!BackOffStrategyUtils::validateBackOffStrategyConfig(backoff_strategy, 1, 10).ok()); + } + + { + // Max interval is lower than base interval. + envoy::config::core::v3::BackoffStrategy backoff_strategy; + backoff_strategy.mutable_max_interval()->set_nanos(2000000); + EXPECT_TRUE(!BackOffStrategyUtils::validateBackOffStrategyConfig(backoff_strategy, 3, 10).ok()); + } +} + } // namespace Envoy diff --git a/test/extensions/access_loggers/fluentd/fluentd_access_log_impl_test.cc b/test/extensions/access_loggers/fluentd/fluentd_access_log_impl_test.cc index a058c22dcf07..14c7bd089fca 100644 --- a/test/extensions/access_loggers/fluentd/fluentd_access_log_impl_test.cc +++ b/test/extensions/access_loggers/fluentd/fluentd_access_log_impl_test.cc @@ -31,16 +31,25 @@ class FluentdAccessLoggerImplTest : public testing::Test { public: FluentdAccessLoggerImplTest() : async_client_(new Tcp::AsyncClient::MockAsyncTcpClient()), - timer_(new Event::MockTimer(&dispatcher_)) {} + backoff_strategy_(new MockBackOffStrategy()), + flush_timer_(new Event::MockTimer(&dispatcher_)), + retry_timer_(new Event::MockTimer(&dispatcher_)) {} - void init(int buffer_size_bytes = 0) { + void init(int buffer_size_bytes = 1, absl::optional max_connect_attempts = absl::nullopt) { EXPECT_CALL(*async_client_, setAsyncTcpClientCallbacks(_)); - EXPECT_CALL(*timer_, enableTimer(_, _)); + EXPECT_CALL(*flush_timer_, enableTimer(_, _)); config_.set_tag(tag_); + + if (max_connect_attempts.has_value()) { + config_.mutable_retry_options()->mutable_max_connect_attempts()->set_value( + max_connect_attempts.value()); + } + config_.mutable_buffer_size_bytes()->set_value(buffer_size_bytes); logger_ = std::make_unique( - Tcp::AsyncTcpClientPtr{async_client_}, dispatcher_, config_, *stats_store_.rootScope()); + cluster_, Tcp::AsyncTcpClientPtr{async_client_}, dispatcher_, config_, + BackOffStrategyPtr{backoff_strategy_}, *stats_store_.rootScope()); } std::string getExpectedMsgpackPayload(int entries_count) { @@ -62,10 +71,13 @@ class FluentdAccessLoggerImplTest : public testing::Test { std::string tag_ = "test.tag"; uint64_t time_ = 123; std::vector data_ = {10, 20}; + NiceMock cluster_; Tcp::AsyncClient::MockAsyncTcpClient* async_client_; + MockBackOffStrategy* backoff_strategy_; Stats::IsolatedStoreImpl stats_store_; Event::MockDispatcher dispatcher_; - Event::MockTimer* timer_; + Event::MockTimer* flush_timer_; + Event::MockTimer* retry_timer_; std::unique_ptr logger_; envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config_; }; @@ -87,8 +99,8 @@ TEST_F(FluentdAccessLoggerImplTest, NoWriteOnLogIfBufferLimitNotPassed) { } TEST_F(FluentdAccessLoggerImplTest, NoWriteOnLogIfDisconnectedByRemote) { - init(); - EXPECT_CALL(*timer_, disableTimer()); + init(1, 1); + EXPECT_CALL(*flush_timer_, disableTimer()); EXPECT_CALL(*async_client_, write(_, _)).Times(0); EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)); EXPECT_CALL(*async_client_, connect()).WillOnce(Invoke([this]() -> bool { @@ -100,8 +112,8 @@ TEST_F(FluentdAccessLoggerImplTest, NoWriteOnLogIfDisconnectedByRemote) { } TEST_F(FluentdAccessLoggerImplTest, NoWriteOnLogIfDisconnectedByLocal) { - init(); - EXPECT_CALL(*timer_, disableTimer()); + init(1, 1); + EXPECT_CALL(*flush_timer_, disableTimer()); EXPECT_CALL(*async_client_, write(_, _)).Times(0); EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)); EXPECT_CALL(*async_client_, connect()).WillOnce(Invoke([this]() -> bool { @@ -114,6 +126,8 @@ TEST_F(FluentdAccessLoggerImplTest, NoWriteOnLogIfDisconnectedByLocal) { TEST_F(FluentdAccessLoggerImplTest, LogSingleEntry) { init(); // Default buffer limit is 0 so single entry should be flushed immediately. + EXPECT_CALL(*backoff_strategy_, reset()); + EXPECT_CALL(*retry_timer_, disableTimer()); EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)).WillOnce(Return(true)); EXPECT_CALL(*async_client_, connect()).WillOnce(Invoke([this]() -> bool { logger_->onEvent(Network::ConnectionEvent::Connected); @@ -133,6 +147,8 @@ TEST_F(FluentdAccessLoggerImplTest, LogTwoEntries) { init(12); // First entry is 10 bytes, so first entry should not cause the logger to flush. // First log should not be flushed. + EXPECT_CALL(*backoff_strategy_, reset()); + EXPECT_CALL(*retry_timer_, disableTimer()); EXPECT_CALL(*async_client_, connected()).Times(0); EXPECT_CALL(*async_client_, write(_, _)).Times(0); logger_->log(std::make_unique(time_, std::move(data_))); @@ -164,11 +180,123 @@ TEST_F(FluentdAccessLoggerImplTest, CallbacksTest) { EXPECT_NO_THROW(logger_->onData(buffer, false)); } +TEST_F(FluentdAccessLoggerImplTest, SuccessfulReconnect) { + init(1, 2); + EXPECT_CALL(*backoff_strategy_, nextBackOffMs()).WillOnce(Return(1)); + EXPECT_CALL(*async_client_, write(_, _)).Times(0); + EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)).WillOnce(Return(true)); + EXPECT_CALL(*async_client_, connect()) + .WillOnce(Invoke([this]() -> bool { + EXPECT_CALL(*backoff_strategy_, reset()).Times(0); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(1), _)); + EXPECT_CALL(*retry_timer_, disableTimer()).Times(0); + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })) + .WillOnce(Invoke([this]() -> bool { + EXPECT_CALL(*backoff_strategy_, reset()); + EXPECT_CALL(*retry_timer_, enableTimer(_, _)).Times(0); + EXPECT_CALL(*retry_timer_, disableTimer()); + logger_->onEvent(Network::ConnectionEvent::Connected); + return true; + })); + EXPECT_CALL(*async_client_, write(_, _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool end_stream) { + EXPECT_FALSE(end_stream); + std::string expected_payload = getExpectedMsgpackPayload(1); + EXPECT_EQ(expected_payload, buffer.toString()); + })); + + logger_->log(std::make_unique(time_, std::move(data_))); + retry_timer_->invokeCallback(); +} + +TEST_F(FluentdAccessLoggerImplTest, ReconnectFailure) { + init(1, 2); + + EXPECT_CALL(*backoff_strategy_, nextBackOffMs()).WillOnce(Return(1)); + EXPECT_CALL(*backoff_strategy_, reset()).Times(0); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(1), _)); + EXPECT_CALL(*retry_timer_, disableTimer()).Times(0); + + EXPECT_CALL(*flush_timer_, disableTimer()); + EXPECT_CALL(*async_client_, write(_, _)).Times(0); + EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)); + EXPECT_CALL(*async_client_, connect()) + .WillOnce(Invoke([this]() -> bool { + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })) + .WillOnce(Invoke([this]() -> bool { + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })); + + logger_->log(std::make_unique(time_, std::move(data_))); + retry_timer_->invokeCallback(); +} + +TEST_F(FluentdAccessLoggerImplTest, TwoReconnects) { + init(1, 3); + + EXPECT_CALL(*backoff_strategy_, nextBackOffMs()).WillOnce(Return(1)).WillOnce(Return(1)); + EXPECT_CALL(*backoff_strategy_, reset()).Times(0); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(1), _)).Times(2); + EXPECT_CALL(*retry_timer_, disableTimer()).Times(0); + + EXPECT_CALL(*flush_timer_, disableTimer()); + EXPECT_CALL(*async_client_, write(_, _)).Times(0); + EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)); + EXPECT_CALL(*async_client_, connect()) + .WillOnce(Invoke([this]() -> bool { + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })) + .WillOnce(Invoke([this]() -> bool { + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })) + .WillOnce(Invoke([this]() -> bool { + logger_->onEvent(Network::ConnectionEvent::LocalClose); + return true; + })); + + logger_->log(std::make_unique(time_, std::move(data_))); + retry_timer_->invokeCallback(); + retry_timer_->invokeCallback(); +} + +TEST_F(FluentdAccessLoggerImplTest, RetryOnNoHealthyUpstream) { + init(); + + EXPECT_CALL(*backoff_strategy_, nextBackOffMs()).WillOnce(Return(1)); + EXPECT_CALL(*backoff_strategy_, reset()).Times(0); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(1), _)); + EXPECT_CALL(*retry_timer_, disableTimer()).Times(0); + + EXPECT_CALL(*async_client_, write(_, _)).Times(0); + EXPECT_CALL(*async_client_, connected()).WillOnce(Return(false)); + EXPECT_CALL(*async_client_, connect()).WillOnce(Return(false)); + logger_->log(std::make_unique(time_, std::move(data_))); +} + +TEST_F(FluentdAccessLoggerImplTest, NoWriteOnBufferFull) { + // Setting the buffer to 0 so new log will be thrown. + init(0); + EXPECT_CALL(*async_client_, write(_, _)).Times(0); + EXPECT_CALL(*async_client_, connect()).Times(0); + EXPECT_CALL(*async_client_, connected()).Times(0); + logger_->log(std::make_unique(time_, std::move(data_))); +} + class FluentdAccessLoggerCacheImplTest : public testing::Test { public: - FluentdAccessLoggerCacheImplTest() : logger_cache_(cluster_manager_, scope_, tls_) {} - void init(bool second_logger = false) { + tls_.setDispatcher(&dispatcher_); + flush_timer_ = new Event::MockTimer(&dispatcher_); + retry_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*flush_timer_, enableTimer(_, _)); + async_client1_ = new Tcp::AsyncClient::MockAsyncTcpClient(); EXPECT_CALL(*async_client1_, setAsyncTcpClientCallbacks(_)); @@ -176,9 +304,16 @@ class FluentdAccessLoggerCacheImplTest : public testing::Test { async_client2_ = new Tcp::AsyncClient::MockAsyncTcpClient(); EXPECT_CALL(*async_client2_, setAsyncTcpClientCallbacks(_)); } + + logger_cache_ = std::make_unique(cluster_manager_, scope_, tls_); } std::string cluster_name_ = "test_cluster"; + uint64_t time_ = 123; + std::vector data_ = {10, 20}; + Event::MockTimer* flush_timer_; + Event::MockTimer* retry_timer_; + Event::MockDispatcher dispatcher_; NiceMock cluster_; NiceMock cluster_manager_; Tcp::AsyncClient::MockAsyncTcpClient* async_client1_; @@ -186,7 +321,8 @@ class FluentdAccessLoggerCacheImplTest : public testing::Test { NiceMock store_; Stats::Scope& scope_{*store_.rootScope()}; NiceMock tls_; - FluentdAccessLoggerCacheImpl logger_cache_; + NiceMock random_; + std::unique_ptr logger_cache_; }; TEST_F(FluentdAccessLoggerCacheImplTest, CreateNonExistingLogger) { @@ -200,7 +336,8 @@ TEST_F(FluentdAccessLoggerCacheImplTest, CreateNonExistingLogger) { config.set_cluster(cluster_name_); config.set_tag("test.tag"); config.mutable_buffer_size_bytes()->set_value(123); - auto logger = logger_cache_.getOrCreateLogger(std::make_shared(config)); + auto logger = + logger_cache_->getOrCreateLogger(std::make_shared(config), random_); EXPECT_TRUE(logger != nullptr); } @@ -215,14 +352,16 @@ TEST_F(FluentdAccessLoggerCacheImplTest, CreateTwoLoggersSameHash) { config1.set_cluster(cluster_name_); config1.set_tag("test.tag"); config1.mutable_buffer_size_bytes()->set_value(123); - auto logger1 = logger_cache_.getOrCreateLogger(std::make_shared(config1)); + auto logger1 = + logger_cache_->getOrCreateLogger(std::make_shared(config1), random_); EXPECT_TRUE(logger1 != nullptr); envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config2; config2.set_cluster(cluster_name_); // config hash will be different than config1 config2.set_tag("test.tag"); config2.mutable_buffer_size_bytes()->set_value(123); - auto logger2 = logger_cache_.getOrCreateLogger(std::make_shared(config2)); + auto logger2 = + logger_cache_->getOrCreateLogger(std::make_shared(config2), random_); EXPECT_TRUE(logger2 != nullptr); // Make sure we got the same logger @@ -243,20 +382,60 @@ TEST_F(FluentdAccessLoggerCacheImplTest, CreateTwoLoggersDifferentHash) { config1.set_cluster(cluster_name_); config1.set_tag("test.tag"); config1.mutable_buffer_size_bytes()->set_value(123); - auto logger1 = logger_cache_.getOrCreateLogger(std::make_shared(config1)); + auto logger1 = + logger_cache_->getOrCreateLogger(std::make_shared(config1), random_); EXPECT_TRUE(logger1 != nullptr); + Event::MockTimer* flush_timer2 = new Event::MockTimer(&dispatcher_); + Event::MockTimer* retry_timer2 = new Event::MockTimer(&dispatcher_); + UNREFERENCED_PARAMETER(retry_timer2); + EXPECT_CALL(*flush_timer2, enableTimer(_, _)); + envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config2; config2.set_cluster("different_cluster"); // config hash will be different than config1 config2.set_tag("test.tag"); config2.mutable_buffer_size_bytes()->set_value(123); - auto logger2 = logger_cache_.getOrCreateLogger(std::make_shared(config2)); + auto logger2 = + logger_cache_->getOrCreateLogger(std::make_shared(config2), random_); EXPECT_TRUE(logger2 != nullptr); // Make sure we got two different loggers EXPECT_NE(logger1, logger2); } +TEST_F(FluentdAccessLoggerCacheImplTest, JitteredExponentialBackOffStrategyConfig) { + init(); + + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(cluster_name_)).WillOnce(Return(&cluster_)); + EXPECT_CALL(*async_client1_, connected()).WillOnce(Return(false)); + EXPECT_CALL(*async_client1_, connect()).WillRepeatedly(Return(false)); + EXPECT_CALL(cluster_, tcpAsyncClient(_, _)).WillOnce(Invoke([&] { + return Tcp::AsyncTcpClientPtr{async_client1_}; + })); + + envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config; + config.set_cluster(cluster_name_); + config.set_tag("test.tag"); + config.mutable_buffer_size_bytes()->set_value(1); + config.mutable_retry_options()->mutable_backoff_options()->mutable_base_interval()->set_nanos( + 7000000); + config.mutable_retry_options()->mutable_backoff_options()->mutable_max_interval()->set_nanos( + 20000000); + + auto logger = + logger_cache_->getOrCreateLogger(std::make_shared(config), random_); + ASSERT_TRUE(logger != nullptr); + + // Setting random so it doesn't add jitter + EXPECT_CALL(random_, random()).WillOnce(Return(6)).WillOnce(Return(13)).WillOnce(Return(19)); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(6), _)); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(13), _)); + EXPECT_CALL(*retry_timer_, enableTimer(std::chrono::milliseconds(19), _)); + logger->log(std::make_unique(time_, std::move(data_))); + retry_timer_->invokeCallback(); + retry_timer_->invokeCallback(); +} + class MockFluentdAccessLogger : public FluentdAccessLogger { public: MOCK_METHOD(void, log, (EntryPtr &&)); @@ -265,7 +444,7 @@ class MockFluentdAccessLogger : public FluentdAccessLogger { class MockFluentdAccessLoggerCache : public FluentdAccessLoggerCache { public: MOCK_METHOD(FluentdAccessLoggerSharedPtr, getOrCreateLogger, - (const FluentdAccessLogConfigSharedPtr)); + (const FluentdAccessLogConfigSharedPtr, Random::RandomGenerator&)); }; class MockFluentdFormatter : public FluentdFormatter { @@ -280,31 +459,32 @@ using FilterPtr = Envoy::AccessLog::FilterPtr; class FluentdAccessLogTest : public testing::Test { public: - FluentdAccessLogTest() { - ON_CALL(*filter_, evaluate(_, _)).WillByDefault(Return(true)); - EXPECT_CALL(*logger_cache_, getOrCreateLogger(_)).WillOnce(Return(logger_)); - } + FluentdAccessLogTest() { ON_CALL(*filter_, evaluate(_, _)).WillByDefault(Return(true)); } AccessLog::MockFilter* filter_{new NiceMock()}; NiceMock tls_; + NiceMock random_; + NiceMock context_; envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig config_; - MockFluentdFormatter* formatter_{new NiceMock()}; - std::shared_ptr logger_{new MockFluentdAccessLogger()}; - std::shared_ptr logger_cache_{new MockFluentdAccessLoggerCache()}; }; TEST_F(FluentdAccessLogTest, CreateAndLog) { - auto access_log = - FluentdAccessLog(AccessLog::FilterPtr{filter_}, FluentdFormatterPtr{formatter_}, - std::make_shared(config_), tls_, logger_cache_); + auto* formatter = new NiceMock(); + auto logger = std::make_shared(); + auto logger_cache = std::make_shared(); + + EXPECT_CALL(*logger_cache, getOrCreateLogger(_, _)).WillOnce(Return(logger)); + auto access_log = FluentdAccessLog(AccessLog::FilterPtr{filter_}, FluentdFormatterPtr{formatter}, + std::make_shared(config_), tls_, + random_, logger_cache); MockTimeSystem time_system; EXPECT_CALL(time_system, systemTime).WillOnce(Return(SystemTime(std::chrono::seconds(200)))); NiceMock stream_info; EXPECT_CALL(stream_info, timeSource()).WillOnce(ReturnRef(time_system)); - EXPECT_CALL(*formatter_, format(_, _)).WillOnce(Return(std::vector{10, 20})); - EXPECT_CALL(*logger_, log(_)).WillOnce(Invoke([](EntryPtr&& entry) { + EXPECT_CALL(*formatter, format(_, _)).WillOnce(Return(std::vector{10, 20})); + EXPECT_CALL(*logger, log(_)).WillOnce(Invoke([](EntryPtr&& entry) { EXPECT_EQ(200, entry->time_); ASSERT_EQ(2, entry->record_.size()); EXPECT_EQ(uint8_t(10), entry->record_[0]); @@ -314,6 +494,44 @@ TEST_F(FluentdAccessLogTest, CreateAndLog) { access_log.log({}, stream_info); } +TEST_F(FluentdAccessLogTest, UnknownCluster) { + FluentdAccessLogFactory factory; + + config_.set_cluster("unknown"); + config_.set_tag("tag"); + config_.set_stat_prefix("prefix"); + auto* record = config_.mutable_record(); + (*record->mutable_fields())["Message"].set_string_value("SomeValue"); + + EXPECT_CALL(context_.server_factory_context_.cluster_manager_, + checkActiveStaticCluster("unknown")) + .WillOnce(Return(absl::InvalidArgumentError("no cluster"))); + + EXPECT_THROW_WITH_MESSAGE( + factory.createAccessLogInstance(config_, AccessLog::FilterPtr{filter_}, context_), + EnvoyException, "cluster 'unknown' was not found"); +} + +TEST_F(FluentdAccessLogTest, InvalidBackoffConfig) { + FluentdAccessLogFactory factory; + + config_.set_cluster("unknown"); + config_.set_tag("tag"); + config_.set_stat_prefix("prefix"); + auto* record = config_.mutable_record(); + (*record->mutable_fields())["Message"].set_string_value("SomeValue"); + auto* retry_options = config_.mutable_retry_options(); + retry_options->mutable_backoff_options()->mutable_base_interval()->set_seconds(3); + retry_options->mutable_backoff_options()->mutable_max_interval()->set_seconds(2); + + EXPECT_CALL(context_.server_factory_context_.cluster_manager_, checkActiveStaticCluster(_)) + .WillOnce(Return(absl::OkStatus())); + + EXPECT_THROW_WITH_MESSAGE( + factory.createAccessLogInstance(config_, AccessLog::FilterPtr{filter_}, context_), + EnvoyException, "max_backoff_interval must be greater or equal to base_backoff_interval"); +} + } // namespace } // namespace Fluentd } // namespace AccessLoggers diff --git a/test/extensions/access_loggers/fluentd/fluentd_access_log_integration_test.cc b/test/extensions/access_loggers/fluentd/fluentd_access_log_integration_test.cc index 933ac3fe1369..6c386c9b979b 100644 --- a/test/extensions/access_loggers/fluentd/fluentd_access_log_integration_test.cc +++ b/test/extensions/access_loggers/fluentd/fluentd_access_log_integration_test.cc @@ -32,7 +32,10 @@ class FluentdAccessLogIntegrationTest : public testing::Test, public BaseIntegra void init(const std::string cluster_name = default_cluster_name, bool flush_access_log_on_connected = false, - absl::optional buffer_size_bytes = absl::nullopt) { + absl::optional buffer_size_bytes = absl::nullopt, + absl::optional max_connect_attempts = 1, + absl::optional base_backoff_interval = absl::nullopt, + absl::optional max_backoff_interval = absl::nullopt) { setUpstreamCount(2); config_helper_.renameListener("tcp_proxy"); config_helper_.addConfigModifier( @@ -64,6 +67,25 @@ class FluentdAccessLogIntegrationTest : public testing::Test, public BaseIntegra access_log_config.mutable_buffer_size_bytes()->set_value(buffer_size_bytes.value()); } + if (max_connect_attempts.has_value()) { + access_log_config.mutable_retry_options()->mutable_max_connect_attempts()->set_value( + max_connect_attempts.value()); + } + + if (base_backoff_interval.has_value()) { + access_log_config.mutable_retry_options() + ->mutable_backoff_options() + ->mutable_base_interval() + ->set_nanos(base_backoff_interval.value() * 1000000); + } + + if (max_backoff_interval.has_value()) { + access_log_config.mutable_retry_options() + ->mutable_backoff_options() + ->mutable_max_interval() + ->set_nanos(max_backoff_interval.value() * 1000000); + } + auto* record = access_log_config.mutable_record(); (*record->mutable_fields())["Message"].set_string_value("SomeValue"); (*record->mutable_fields())["LogType"].set_string_value("%ACCESS_LOG_TYPE%"); @@ -137,6 +159,18 @@ TEST_F(FluentdAccessLogIntegrationTest, UnknownCluster) { EXPECT_DEATH(init("unknown_cluster"), ""); } +TEST_F(FluentdAccessLogIntegrationTest, InvalidBackoffConfig) { + // Invalid config: min interval set to 30, max interval is set to 20. + EXPECT_DEATH(init(default_cluster_name, false, 1, 1, 30, 20), ""); +} + +TEST_F(FluentdAccessLogIntegrationTest, LogLostOnBufferFull) { + init(default_cluster_name, false, /* max_buffer_size = */ 0); + sendBidirectionalData(); + + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.entries_lost", 1); +} + TEST_F(FluentdAccessLogIntegrationTest, SingleEntrySingleRecord) { init(); sendBidirectionalData(); @@ -145,6 +179,9 @@ TEST_F(FluentdAccessLogIntegrationTest, SingleEntrySingleRecord) { test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.events_sent", 1); ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 1); + test_server_->waitForGaugeEq("cluster.fluentd_cluster.upstream_cx_active", 1); + EXPECT_TRUE(fake_access_log_connection_->waitForData([&](const std::string& tcp_data) -> bool { bool validated = false; validateFluentdPayload(tcp_data, &validated, @@ -161,6 +198,9 @@ TEST_F(FluentdAccessLogIntegrationTest, SingleEntryTwoRecords) { test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.events_sent", 1); ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 1); + test_server_->waitForGaugeEq("cluster.fluentd_cluster.upstream_cx_active", 1); + EXPECT_TRUE(fake_access_log_connection_->waitForData([&](const std::string& tcp_data) -> bool { bool validated = false; validateFluentdPayload(tcp_data, &validated, @@ -171,13 +211,16 @@ TEST_F(FluentdAccessLogIntegrationTest, SingleEntryTwoRecords) { } TEST_F(FluentdAccessLogIntegrationTest, TwoEntries) { - init(default_cluster_name, /*flush_access_log_on_connected = */ true, /*buffer_size_bytes = */ 0); + init(default_cluster_name, /*flush_access_log_on_connected = */ true, /*buffer_size_bytes = */ 1); sendBidirectionalData(); test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.entries_buffered", 2); test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.events_sent", 2); ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 1); + test_server_->waitForGaugeEq("cluster.fluentd_cluster.upstream_cx_active", 1); + EXPECT_TRUE(fake_access_log_connection_->waitForData([&](const std::string& tcp_data) -> bool { bool validated = false; validateFluentdPayload(tcp_data, &validated, @@ -195,6 +238,9 @@ TEST_F(FluentdAccessLogIntegrationTest, UpstreamConnectionClosed) { test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.events_sent", 1); ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 1); + test_server_->waitForGaugeEq("cluster.fluentd_cluster.upstream_cx_active", 1); + EXPECT_TRUE(fake_access_log_connection_->waitForData([&](const std::string& tcp_data) -> bool { bool validated = false; validateFluentdPayload(tcp_data, &validated, @@ -204,11 +250,42 @@ TEST_F(FluentdAccessLogIntegrationTest, UpstreamConnectionClosed) { ASSERT_TRUE(fake_access_log_connection_->close()); test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.connections_closed", 1); + test_server_->waitForGaugeEq("cluster.fluentd_cluster.upstream_cx_active", 0); // New access log would be discarded because the connection is closed. sendBidirectionalData(); test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.entries_lost", 1); } +TEST_F(FluentdAccessLogIntegrationTest, UpstreamConnectionClosedWithMultipleReconnects) { + init(default_cluster_name, false, {}, /* max_reconnect_attempts = */ 3); + sendBidirectionalData(); + + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.entries_buffered", 1); + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.events_sent", 1); + + ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 1); + ASSERT_TRUE(fake_access_log_connection_->close()); + + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.connections_closed", 1); + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.reconnect_attempts", 1); + FakeRawConnectionPtr fake_access_log_connection_2; + ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_2)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 2); + ASSERT_TRUE(fake_access_log_connection_2->close()); + + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.connections_closed", 2); + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.reconnect_attempts", 2); + FakeRawConnectionPtr fake_access_log_connection_3; + ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_access_log_connection_3)); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_total", 3); + ASSERT_TRUE(fake_access_log_connection_3->close()); + + test_server_->waitForCounterEq("access_logs.fluentd.fluentd_1.connections_closed", 3); + test_server_->waitForCounterEq("cluster.fluentd_cluster.upstream_cx_connect_attempts_exceeded", + 1); +} + } // namespace } // namespace Envoy diff --git a/test/mocks/common.h b/test/mocks/common.h index 455dc2f59523..246aec0f0068 100644 --- a/test/mocks/common.h +++ b/test/mocks/common.h @@ -2,6 +2,7 @@ #include +#include "envoy/common/backoff_strategy.h" #include "envoy/common/conn_pool.h" #include "envoy/common/key_value_store.h" #include "envoy/common/random_generator.h" @@ -67,6 +68,16 @@ class MockTimeSystem : public Event::TestTimeSystem { Event::TestRealTimeSystem real_time_; // NO_CHECK_FORMAT(real_time) }; +class MockBackOffStrategy : public BackOffStrategy { +public: + ~MockBackOffStrategy() override = default; + + MOCK_METHOD(uint64_t, nextBackOffMs, ()); + MOCK_METHOD(void, reset, ()); + MOCK_METHOD(void, reset, (uint64_t base_interval)); + MOCK_METHOD(bool, isOverTimeLimit, (uint64_t interval_ms), (const)); +}; + // Captures absl::string_view parameters into temp strings, for use // with gmock's SaveArg. Providing an absl::string_view compiles, // but fails because by the time you examine the saved value, its