Skip to content

Commit

Permalink
tcp_proxy: add per-connection idle timeout override by filter state (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#32422)


---------

Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Feb 16, 2024
1 parent e10d253 commit 678de63
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ message TcpProxy {
// is defined as the period in which there are no bytes sent or received on either
// the upstream or downstream connection. If not set, the default idle timeout is 1 hour. If set
// to 0s, the timeout will be disabled.
// It is possible to dynamically override this configuration by setting a per-connection filter
// state object for the key ``envoy.tcp_proxy.per_connection_idle_timeout_ms``.
//
// .. warning::
// Disabling this timeout has a highly likelihood of yielding connection leaks due to lost TCP
Expand Down
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ new_features:
- area: redis
change: |
Added support for the ``ECHO`` command.
- area: tcp_proxy
change: |
added an option to dynamically set a per downstream connection idle timeout period object under the key
``envoy.tcp_proxy.per_connection_idle_timeout_ms``. If this filter state value exists, it will override the idle timeout
specified in the filter configuration and the default idle timeout.
deprecated:
- area: listener
Expand Down
5 changes: 5 additions & 0 deletions docs/root/configuration/advanced/well_known_filter_state.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ The following lists the filter state object keys used by the Envoy extensions:
A special generic string object factory, to be used as a :ref:`factory lookup key
<envoy_v3_api_field_extensions.filters.common.set_filter_state.v3.FilterStateValue.factory_key>`.

``envoy.tcp_proxy.per_connection_idle_timeout_ms``
:ref:`TCP proxy idle timeout duration
<envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.idle_timeout>` override on a per-connection
basis. Accepts a count of milliseconds number string as a constructor.

Filter state object fields
--------------------------

Expand Down
2 changes: 2 additions & 0 deletions source/common/stream_info/uint64_accessor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class UInt64AccessorImpl : public UInt64Accessor {
return message;
}

absl::optional<std::string> serializeAsString() const override { return std::to_string(value_); }

// From UInt64Accessor.
void increment() override { value_++; }
uint64_t value() const override { return value_; }
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ envoy_cc_library(
"//source/common/router:metadatamatchcriteria_lib",
"//source/common/stream_info:stream_id_provider_lib",
"//source/common/stream_info:stream_info_lib",
"//source/common/stream_info:uint64_accessor_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/upstreams/tcp/generic:config",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
Expand Down
48 changes: 38 additions & 10 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "source/common/network/upstream_socket_options_filter_state.h"
#include "source/common/router/metadatamatchcriteria_impl.h"
#include "source/common/stream_info/stream_id_provider_impl.h"
#include "source/common/stream_info/uint64_accessor_impl.h"

namespace Envoy {
namespace TcpProxy {
Expand All @@ -53,6 +54,22 @@ class PerConnectionClusterFactory : public StreamInfo::FilterState::ObjectFactor

REGISTER_FACTORY(PerConnectionClusterFactory, StreamInfo::FilterState::ObjectFactory);

class PerConnectionIdleTimeoutMsObjectFactory : public StreamInfo::FilterState::ObjectFactory {
public:
std::string name() const override { return std::string(PerConnectionIdleTimeoutMs); }
std::unique_ptr<StreamInfo::FilterState::Object>
createFromBytes(absl::string_view data) const override {
uint64_t duration_in_milliseconds = 0;
if (absl::SimpleAtoi(data, &duration_in_milliseconds)) {
return std::make_unique<StreamInfo::UInt64AccessorImpl>(duration_in_milliseconds);
}

return nullptr;
}
};

REGISTER_FACTORY(PerConnectionIdleTimeoutMsObjectFactory, StreamInfo::FilterState::ObjectFactory);

Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name)
: parent_(parent), cluster_name_(cluster_name) {}

Expand Down Expand Up @@ -755,7 +772,7 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
conn_data->connection().state() != Network::Connection::State::Closed) {
config_->drainManager().add(config_->sharedConfig(), std::move(conn_data),
std::move(upstream_callbacks_), std::move(idle_timer_),
read_callbacks_->upstreamHost());
idle_timeout_, read_callbacks_->upstreamHost());
}
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
Expand Down Expand Up @@ -829,7 +846,15 @@ void Filter::onUpstreamConnection() {
read_callbacks_->connection(),
getStreamInfo().downstreamAddressProvider().requestedServerName());

if (config_->idleTimeout()) {
idle_timeout_ = config_->idleTimeout();
if (const auto* per_connection_idle_timeout =
getStreamInfo().filterState()->getDataReadOnly<StreamInfo::UInt64Accessor>(
PerConnectionIdleTimeoutMs);
per_connection_idle_timeout != nullptr) {
idle_timeout_ = std::chrono::milliseconds(per_connection_idle_timeout->value());
}

if (idle_timeout_) {
// The idle_timer_ can be moved to a Drainer, so related callbacks call into
// the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
// the call to either TcpProxy or to Drainer, depending on the current state.
Expand Down Expand Up @@ -905,8 +930,8 @@ void Filter::disableAccessLogFlushTimer() {

void Filter::resetIdleTimer() {
if (idle_timer_ != nullptr) {
ASSERT(config_->idleTimeout());
idle_timer_->enableTimer(config_->idleTimeout().value());
ASSERT(idle_timeout_);
idle_timer_->enableTimer(idle_timeout_.value());
}
}

Expand Down Expand Up @@ -943,9 +968,10 @@ void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config,
Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
Event::TimerPtr&& idle_timer,
absl::optional<std::chrono::milliseconds> idle_timeout,
const Upstream::HostDescriptionConstSharedPtr& upstream_host) {
DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data),
std::move(idle_timer), upstream_host));
std::move(idle_timer), idle_timeout, upstream_host));
callbacks->drain(*drainer);

// Use temporary to ensure we get the pointer before we move it out of drainer
Expand All @@ -963,9 +989,11 @@ void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatche
Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
absl::optional<std::chrono::milliseconds> idle_timeout,
const Upstream::HostDescriptionConstSharedPtr& upstream_host)
: parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)),
timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) {
idle_timer_(std::move(idle_timer)), idle_timeout_(idle_timeout),
upstream_host_(upstream_host), config_(config) {
ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection());
config_->stats().upstream_flush_total_.inc();
config_->stats().upstream_flush_active_.inc();
Expand All @@ -974,8 +1002,8 @@ Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedP
void Drainer::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
if (timer_ != nullptr) {
timer_->disableTimer();
if (idle_timer_ != nullptr) {
idle_timer_->disableTimer();
}
config_->stats().upstream_flush_active_.dec();
parent_.remove(*this, upstream_conn_data_->connection().dispatcher());
Expand All @@ -998,8 +1026,8 @@ void Drainer::onIdleTimeout() {
}

void Drainer::onBytesSent() {
if (timer_ != nullptr) {
timer_->enableTimer(config_->idleTimeout().value());
if (idle_timer_ != nullptr) {
idle_timer_->enableTimer(idle_timeout_.value());
}
}

Expand Down
10 changes: 8 additions & 2 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
namespace Envoy {
namespace TcpProxy {

constexpr absl::string_view PerConnectionIdleTimeoutMs =
"envoy.tcp_proxy.per_connection_idle_timeout_ms";

/**
* All tcp proxy stats. @see stats_macros.h
*/
Expand Down Expand Up @@ -547,6 +550,7 @@ class Filter : public Network::ReadFilter,
Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
Network::Socket::OptionsSharedPtr upstream_options_;
absl::optional<std::chrono::milliseconds> idle_timeout_;
uint32_t connect_attempts_{};
bool connecting_{};
bool downstream_closed_{};
Expand All @@ -560,6 +564,7 @@ class Drainer : public Event::DeferredDeletable, protected Logger::Loggable<Logg
Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
absl::optional<std::chrono::milliseconds> idle_timeout,
const Upstream::HostDescriptionConstSharedPtr& upstream_host);

void onEvent(Network::ConnectionEvent event);
Expand All @@ -573,7 +578,8 @@ class Drainer : public Event::DeferredDeletable, protected Logger::Loggable<Logg
UpstreamDrainManager& parent_;
std::shared_ptr<Filter::UpstreamCallbacks> callbacks_;
Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_;
Event::TimerPtr timer_;
Event::TimerPtr idle_timer_;
absl::optional<std::chrono::milliseconds> idle_timeout_;
Upstream::HostDescriptionConstSharedPtr upstream_host_;
Config::SharedConfigSharedPtr config_;
};
Expand All @@ -586,7 +592,7 @@ class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject {
void add(const Config::SharedConfigSharedPtr& config,
Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
Event::TimerPtr&& idle_timer,
Event::TimerPtr&& idle_timer, absl::optional<std::chrono::milliseconds> idle_timeout,
const Upstream::HostDescriptionConstSharedPtr& upstream_host);
void remove(Drainer& drainer, Event::Dispatcher& dispatcher);

Expand Down
8 changes: 8 additions & 0 deletions test/common/stream_info/uint64_accessor_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ TEST(UInt64AccessorImplTest, TestProto) {
EXPECT_EQ(init_value, uint64_struct->value());
}

TEST(UInt64AccessorImplTest, TestString) {
uint64_t init_value = 0xdeadbeefdeadbeef;
UInt64AccessorImpl accessor(init_value);
absl::optional<std::string> value = accessor.serializeAsString();
ASSERT_TRUE(value.has_value());
EXPECT_EQ(value, std::to_string(init_value));
}

} // namespace
} // namespace StreamInfo
} // namespace Envoy
60 changes: 60 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "source/common/network/upstream_socket_options_filter_state.h"
#include "source/common/network/win32_redirect_records_option_impl.h"
#include "source/common/router/metadatamatchcriteria_impl.h"
#include "source/common/stream_info/uint64_accessor_impl.h"
#include "source/common/tcp_proxy/tcp_proxy.h"
#include "source/common/upstream/upstream_impl.h"

Expand Down Expand Up @@ -815,6 +816,65 @@ TEST_F(TcpProxyTest, UpstreamConnectionLimit) {
EXPECT_EQ(access_log_data_, "UO");
}

TEST_F(TcpProxyTest, IdleTimeoutObjectFactory) {
const std::string name = "envoy.tcp_proxy.per_connection_idle_timeout_ms";
auto* factory =
Registry::FactoryRegistry<StreamInfo::FilterState::ObjectFactory>::getFactory(name);
ASSERT_NE(nullptr, factory);
EXPECT_EQ(name, factory->name());
const std::string duration_in_milliseconds = std::to_string(1234);
auto object = factory->createFromBytes(duration_in_milliseconds);
ASSERT_NE(nullptr, object);
EXPECT_EQ(duration_in_milliseconds, object->serializeAsString());
}

TEST_F(TcpProxyTest, InvalidIdleTimeoutObjectFactory) {
const std::string name = "envoy.tcp_proxy.per_connection_idle_timeout_ms";
auto* factory =
Registry::FactoryRegistry<StreamInfo::FilterState::ObjectFactory>::getFactory(name);
ASSERT_NE(nullptr, factory);
EXPECT_EQ(name, factory->name());
ASSERT_EQ(nullptr, factory->createFromBytes("not_a_number"));
}

TEST_F(TcpProxyTest, IdleTimeoutWithFilterStateOverride) {
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = defaultConfig();
config.mutable_idle_timeout()->set_seconds(1);
setup(1, config);

uint64_t idle_timeout_override = 5000;

// Although the configured idle timeout is 1 second, overriding the value through filter state
// so the expected idle timeout is 5 seconds instead.
filter_callbacks_.connection_.streamInfo().filterState()->setData(
TcpProxy::PerConnectionIdleTimeoutMs,
std::make_unique<StreamInfo::UInt64AccessorImpl>(idle_timeout_override),
StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);

Event::MockTimer* idle_timer = new Event::MockTimer(&filter_callbacks_.connection_.dispatcher_);
EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _));
raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _));
filter_->onData(buffer, false);

buffer.add("hello2");
EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _));
upstream_callbacks_->onUpstreamData(buffer, false);

EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _));
filter_callbacks_.connection_.raiseBytesSentCallbacks(1);

EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _));
upstream_connections_.at(0)->raiseBytesSentCallbacks(2);

EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush, _));
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush, _));
EXPECT_CALL(*idle_timer, disableTimer());
idle_timer->invokeCallback();
}

// Tests that the idle timer closes both connections, and gets updated when either
// connection has activity.
TEST_F(TcpProxyTest, IdleTimeout) {
Expand Down

0 comments on commit 678de63

Please sign in to comment.