Skip to content

Commit

Permalink
fluentd_access_logger: add retry and backoff options (envoyproxy#32682)
Browse files Browse the repository at this point in the history
Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Mar 19, 2024
1 parent aa19504 commit 79d5a6d
Show file tree
Hide file tree
Showing 13 changed files with 529 additions and 62 deletions.
5 changes: 4 additions & 1 deletion api/envoy/extensions/access_loggers/fluentd/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")
licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_xds//udpa/annotations:pkg"],
deps = [
"//envoy/config/core/v3:pkg",
"@com_github_cncf_xds//udpa/annotations:pkg",
],
)
20 changes: 19 additions & 1 deletion api/envoy/extensions/access_loggers/fluentd/v3/fluentd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

package envoy.extensions.access_loggers.fluentd.v3;

import "envoy/config/core/v3/backoff.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/wrappers.proto";
Expand All @@ -22,8 +24,19 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// the Fluentd Forward Protocol as described in: `Fluentd Forward Protocol Specification
// <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1>`_.
// [#extension: envoy.access_loggers.fluentd]
// [#next-free-field: 7]
// [#next-free-field: 8]
message FluentdAccessLogConfig {
message RetryOptions {
// The number of times the logger will attempt to connect to the upstream during reconnects.
// By default, there is no limit. The logger will attempt to reconnect to the upstream each time
// connecting to the upstream failed or the upstream connection had been closed for any reason.
google.protobuf.UInt32Value max_connect_attempts = 1;

// Sets the backoff strategy. If this value is not set, the default base backoff interval is 500
// milliseconds and the default max backoff interval is 5 seconds (10 times the base interval).
config.core.v3.BackoffStrategy backoff_options = 2;
}

// The upstream cluster to connect to for streaming the Fluentd messages.
string cluster = 1 [(validate.rules).string = {min_len: 1}];

Expand Down Expand Up @@ -67,4 +80,9 @@ message FluentdAccessLogConfig {
// "message": "My error message"
// }
google.protobuf.Struct record = 6 [(validate.rules).message = {required: true}];

// Optional retry, in case upstream connection has failed. If this field is not set, the default values will be applied,
// as specified in the :ref:`RetryOptions <envoy_v3_api_msg_extensions.access_loggers.fluentd.v3.FluentdAccessLogConfig.RetryOptions>`
// configuration.
RetryOptions retry_options = 7;
}
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ new_features:
added a :ref:`configuration option
<envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.append_local_overload>` to add
``x-envoy-local-overloaded`` header when Overload Manager is triggered.
- area: access_loggers
change: |
Added :ref:`retry options
<envoy_v3_api_msg_extensions.access_loggers.fluentd.v3.FluentdAccessLogConfig.RetryOptions>` to Fluentd Access Logger to
support upstream reconnect options, backoff intervals.
- area: tracing
change: |
Added support to configure a Dynatrace sampler for the OpenTelemetry tracer.
Expand Down
1 change: 1 addition & 0 deletions docs/root/configuration/observability/access_log/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ The Fluentd access log has statistics rooted at the *access_logs.fluentd.<stat_p
entries_lost, Counter, Total number of times an access log entry was discarded due to unavailable connection.
entries_buffered, Counter, Total number of entries (access log record) that was buffered/
events_sent, Counter, Total number of events (Fluentd Forward Mode events) sent to the upstream.
reconnect_attempts, Counter, Total number of times an attempt to reconnect to the upstream has been made.
connections_closed, Counter, Total number of times a connection to the upstream cluster was closed.
2 changes: 2 additions & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ envoy_cc_library(
":assert_lib",
"//envoy/common:backoff_strategy_interface",
"//envoy/common:random_generator_interface",
"//source/common/protobuf:utility_lib_header",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

Expand Down
21 changes: 21 additions & 0 deletions source/common/common/backoff_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

#include "envoy/common/backoff_strategy.h"
#include "envoy/common/random_generator.h"
#include "envoy/config/core/v3/backoff.pb.h"
#include "envoy/config/core/v3/backoff.pb.validate.h"

#include "source/common/common/assert.h"
#include "source/common/protobuf/utility.h"

namespace Envoy {

Expand Down Expand Up @@ -98,4 +101,22 @@ class FixedBackOffStrategy : public BackOffStrategy {
uint64_t interval_ms_;
};

class BackOffStrategyUtils {
public:
static absl::Status
validateBackOffStrategyConfig(envoy::config::core::v3::BackoffStrategy backoff_strategy,
uint64_t default_base_interval_ms, uint64_t max_interval_factor) {
uint64_t base_interval_ms =
PROTOBUF_GET_MS_OR_DEFAULT(backoff_strategy, base_interval, default_base_interval_ms);
uint64_t max_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(backoff_strategy, max_interval,
base_interval_ms * max_interval_factor);

if (max_interval_ms < base_interval_ms) {
return absl::InvalidArgumentError("max_interval must be greater or equal to base_interval");
}

return absl::OkStatus();
}
};

} // namespace Envoy
11 changes: 11 additions & 0 deletions source/extensions/access_loggers/fluentd/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ FluentdAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config
throw EnvoyException(fmt::format("cluster '{}' was not found", proto_config.cluster()));
}

if (proto_config.has_retry_options() && proto_config.retry_options().has_backoff_options()) {
status = BackOffStrategyUtils::validateBackOffStrategyConfig(
proto_config.retry_options().backoff_options(), DefaultBaseBackoffIntervalMs,
DefaultMaxBackoffIntervalFactor);
if (!status.ok()) {
throw EnvoyException(
"max_backoff_interval must be greater or equal to base_backoff_interval");
}
}

// Supporting nested object serialization is more complex with MessagePack.
// Using an already existing JSON formatter, and later converting the JSON string to a msgpack
// payload.
Expand All @@ -60,6 +70,7 @@ FluentdAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config
std::move(filter), std::move(fluentd_formatter),
std::make_shared<FluentdAccessLogConfig>(proto_config),
context.serverFactoryContext().threadLocal(),
context.serverFactoryContext().api().randomGenerator(),
getAccessLoggerCacheSingleton(context.serverFactoryContext()));
}

Expand Down
103 changes: 80 additions & 23 deletions source/extensions/access_loggers/fluentd/fluentd_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@ namespace Fluentd {
using MessagePackBuffer = msgpack::sbuffer;
using MessagePackPacker = msgpack::packer<msgpack::sbuffer>;

FluentdAccessLoggerImpl::FluentdAccessLoggerImpl(Tcp::AsyncTcpClientPtr client,
FluentdAccessLoggerImpl::FluentdAccessLoggerImpl(Upstream::ThreadLocalCluster& cluster,
Tcp::AsyncTcpClientPtr client,
Event::Dispatcher& dispatcher,
const FluentdAccessLogConfig& config,
BackOffStrategyPtr backoff_strategy,
Stats::Scope& parent_scope)
: tag_(config.tag()), id_(dispatcher.name()),
max_connect_attempts_(
config.has_retry_options() && config.retry_options().has_max_connect_attempts()
? absl::optional<uint32_t>(config.retry_options().max_connect_attempts().value())
: absl::nullopt),
stats_scope_(parent_scope.createScope(config.stat_prefix())),
fluentd_stats_(
{ACCESS_LOG_FLUENTD_STATS(POOL_COUNTER(*stats_scope_), POOL_GAUGE(*stats_scope_))}),
client_(std::move(client)),
buffer_flush_interval_msec_(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)),
cluster_(cluster), backoff_strategy_(std::move(backoff_strategy)), client_(std::move(client)),
buffer_flush_interval_msec_(
PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, DefaultBufferFlushIntervalMs)),
max_buffer_size_bytes_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, DefaultMaxBufferSize)),
retry_timer_(dispatcher.createTimer([this]() -> void { onBackoffCallback(); })),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
Expand All @@ -35,34 +44,30 @@ void FluentdAccessLoggerImpl::onEvent(Network::ConnectionEvent event) {
connecting_ = false;

if (event == Network::ConnectionEvent::Connected) {
backoff_strategy_->reset();
retry_timer_->disableTimer();
flush();
} else if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
ENVOY_LOG(debug, "upstream connection was closed");
// TODO(ohadvano): add an option to reconnect to the upstream, if configured

fluentd_stats_.connections_closed_.inc();
disconnected_ = true;
clearBuffer();

ASSERT(flush_timer_ != nullptr);
flush_timer_->disableTimer();
maybeReconnect();
}
}

void FluentdAccessLoggerImpl::log(EntryPtr&& entry) {
if (disconnected_) {
if (disconnected_ || approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
fluentd_stats_.entries_lost_.inc();
// We will lose the data deliberately so the buffer doesn't grow infinitely.
// Since the client is disconnected, there's nothing much we can do with the data anyway.
// TODO(ohadvano): add an option to reconnect to the upstream, if configured
return;
}

approximate_message_size_bytes_ += sizeof(entry->time_) + entry->record_.size();
entries_.push_back(std::move(entry));
fluentd_stats_.entries_buffered_.inc();
if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
// If we exceeded the buffer limit, immediately flush the logs instead of waiting for
// the next flush interval, to allow new logs to be buffered.
flush();
}
}
Expand All @@ -76,8 +81,7 @@ void FluentdAccessLoggerImpl::flush() {
}

if (!client_->connected()) {
connecting_ = true;
client_->connect();
connect();
return;
}

Expand All @@ -102,6 +106,42 @@ void FluentdAccessLoggerImpl::flush() {
clearBuffer();
}

void FluentdAccessLoggerImpl::connect() {
connect_attempts_++;
if (!client_->connect()) {
ENVOY_LOG(debug, "no healthy upstream");
maybeReconnect();
return;
}

connecting_ = true;
}

void FluentdAccessLoggerImpl::maybeReconnect() {
if (max_connect_attempts_.has_value() && connect_attempts_ >= max_connect_attempts_) {
ENVOY_LOG(debug, "max connection attempts reached");
cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
setDisconnected();
return;
}

uint64_t next_backoff_ms = backoff_strategy_->nextBackOffMs();
retry_timer_->enableTimer(std::chrono::milliseconds(next_backoff_ms));
ENVOY_LOG(debug, "reconnect attempt scheduled for {} ms", next_backoff_ms);
}

void FluentdAccessLoggerImpl::onBackoffCallback() {
fluentd_stats_.reconnect_attempts_.inc();
this->connect();
}

void FluentdAccessLoggerImpl::setDisconnected() {
disconnected_ = true;
clearBuffer();
ASSERT(flush_timer_ != nullptr);
flush_timer_->disableTimer();
}

void FluentdAccessLoggerImpl::clearBuffer() {
entries_.clear();
approximate_message_size_bytes_ = 0;
Expand All @@ -117,33 +157,50 @@ FluentdAccessLoggerCacheImpl::FluentdAccessLoggerCacheImpl(
}

FluentdAccessLoggerSharedPtr
FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config) {
FluentdAccessLoggerCacheImpl::getOrCreateLogger(const FluentdAccessLogConfigSharedPtr config,
Random::RandomGenerator& random) {
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
const auto cache_key = MessageUtil::hash(*config);
const auto it = cache.access_loggers_.find(cache_key);
if (it != cache.access_loggers_.end() && !it->second.expired()) {
return it->second.lock();
}

auto* cluster = cluster_manager_.getThreadLocalCluster(config->cluster());
auto client =
cluster_manager_.getThreadLocalCluster(config->cluster())
->tcpAsyncClient(nullptr, std::make_shared<const Tcp::AsyncTcpClientOptions>(false));
cluster->tcpAsyncClient(nullptr, std::make_shared<const Tcp::AsyncTcpClientOptions>(false));

uint64_t base_interval_ms = DefaultBaseBackoffIntervalMs;
uint64_t max_interval_ms = base_interval_ms * DefaultMaxBackoffIntervalFactor;

if (config->has_retry_options() && config->retry_options().has_backoff_options()) {
base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(),
base_interval, DefaultBaseBackoffIntervalMs);
max_interval_ms =
PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(), max_interval,
base_interval_ms * DefaultMaxBackoffIntervalFactor);
}

BackOffStrategyPtr backoff_strategy = std::make_unique<JitteredExponentialBackOffStrategy>(
base_interval_ms, max_interval_ms, random);

const auto logger = std::make_shared<FluentdAccessLoggerImpl>(
std::move(client), cache.dispatcher_, *config, *stats_scope_);
*cluster, std::move(client), cache.dispatcher_, *config, std::move(backoff_strategy),
*stats_scope_);
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}

FluentdAccessLog::FluentdAccessLog(AccessLog::FilterPtr&& filter, FluentdFormatterPtr&& formatter,
const FluentdAccessLogConfigSharedPtr config,
ThreadLocal::SlotAllocator& tls,
ThreadLocal::SlotAllocator& tls, Random::RandomGenerator& random,
FluentdAccessLoggerCacheSharedPtr access_logger_cache)
: ImplBase(std::move(filter)), formatter_(std::move(formatter)), tls_slot_(tls.allocateSlot()),
config_(config), access_logger_cache_(access_logger_cache) {
tls_slot_->set(
[config = config_, access_logger_cache = access_logger_cache_](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(access_logger_cache->getOrCreateLogger(config));
[config = config_, &random, access_logger_cache = access_logger_cache_](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(
access_logger_cache->getOrCreateLogger(config, random));
});
}

Expand Down
Loading

0 comments on commit 79d5a6d

Please sign in to comment.