From 678de631676e30a86f330b9d5aa9305a22c733b4 Mon Sep 17 00:00:00 2001 From: ohadvano <49730675+ohadvano@users.noreply.github.com> Date: Sat, 17 Feb 2024 01:25:14 +0200 Subject: [PATCH] tcp_proxy: add per-connection idle timeout override by filter state (#32422) --------- Signed-off-by: ohadvano --- .../network/tcp_proxy/v3/tcp_proxy.proto | 2 + changelogs/current.yaml | 5 ++ .../advanced/well_known_filter_state.rst | 5 ++ .../common/stream_info/uint64_accessor_impl.h | 2 + source/common/tcp_proxy/BUILD | 1 + source/common/tcp_proxy/tcp_proxy.cc | 48 +++++++++++---- source/common/tcp_proxy/tcp_proxy.h | 10 +++- .../stream_info/uint64_accessor_impl_test.cc | 8 +++ test/common/tcp_proxy/tcp_proxy_test.cc | 60 +++++++++++++++++++ 9 files changed, 129 insertions(+), 12 deletions(-) diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index d9f0db325373..b0f7602a8e08 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -184,6 +184,8 @@ message TcpProxy { // is defined as the period in which there are no bytes sent or received on either // the upstream or downstream connection. If not set, the default idle timeout is 1 hour. If set // to 0s, the timeout will be disabled. + // It is possible to dynamically override this configuration by setting a per-connection filter + // state object for the key ``envoy.tcp_proxy.per_connection_idle_timeout_ms``. // // .. warning:: // Disabling this timeout has a highly likelihood of yielding connection leaks due to lost TCP diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9e35431cbdd9..e7f8bc3329f5 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -198,6 +198,11 @@ new_features: - area: redis change: | Added support for the ``ECHO`` command. +- area: tcp_proxy + change: | + added an option to dynamically set a per downstream connection idle timeout period object under the key + ``envoy.tcp_proxy.per_connection_idle_timeout_ms``. If this filter state value exists, it will override the idle timeout + specified in the filter configuration and the default idle timeout. deprecated: - area: listener diff --git a/docs/root/configuration/advanced/well_known_filter_state.rst b/docs/root/configuration/advanced/well_known_filter_state.rst index 50c4f3476077..f238dd207854 100644 --- a/docs/root/configuration/advanced/well_known_filter_state.rst +++ b/docs/root/configuration/advanced/well_known_filter_state.rst @@ -63,6 +63,11 @@ The following lists the filter state object keys used by the Envoy extensions: A special generic string object factory, to be used as a :ref:`factory lookup key `. +``envoy.tcp_proxy.per_connection_idle_timeout_ms`` + :ref:`TCP proxy idle timeout duration + ` override on a per-connection + basis. Accepts a count of milliseconds number string as a constructor. + Filter state object fields -------------------------- diff --git a/source/common/stream_info/uint64_accessor_impl.h b/source/common/stream_info/uint64_accessor_impl.h index 5859263135ab..ff14bbe7844c 100644 --- a/source/common/stream_info/uint64_accessor_impl.h +++ b/source/common/stream_info/uint64_accessor_impl.h @@ -19,6 +19,8 @@ class UInt64AccessorImpl : public UInt64Accessor { return message; } + absl::optional serializeAsString() const override { return std::to_string(value_); } + // From UInt64Accessor. void increment() override { value_++; } uint64_t value() const override { return value_; } diff --git a/source/common/tcp_proxy/BUILD b/source/common/tcp_proxy/BUILD index 07d5c5995d29..4d1309a5a4b7 100644 --- a/source/common/tcp_proxy/BUILD +++ b/source/common/tcp_proxy/BUILD @@ -78,6 +78,7 @@ envoy_cc_library( "//source/common/router:metadatamatchcriteria_lib", "//source/common/stream_info:stream_id_provider_lib", "//source/common/stream_info:stream_info_lib", + "//source/common/stream_info:uint64_accessor_lib", "//source/common/upstream:load_balancer_lib", "//source/extensions/upstreams/tcp/generic:config", "@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto", diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 4f87dd6ae37a..4e49e16597a8 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -34,6 +34,7 @@ #include "source/common/network/upstream_socket_options_filter_state.h" #include "source/common/router/metadatamatchcriteria_impl.h" #include "source/common/stream_info/stream_id_provider_impl.h" +#include "source/common/stream_info/uint64_accessor_impl.h" namespace Envoy { namespace TcpProxy { @@ -53,6 +54,22 @@ class PerConnectionClusterFactory : public StreamInfo::FilterState::ObjectFactor REGISTER_FACTORY(PerConnectionClusterFactory, StreamInfo::FilterState::ObjectFactory); +class PerConnectionIdleTimeoutMsObjectFactory : public StreamInfo::FilterState::ObjectFactory { +public: + std::string name() const override { return std::string(PerConnectionIdleTimeoutMs); } + std::unique_ptr + createFromBytes(absl::string_view data) const override { + uint64_t duration_in_milliseconds = 0; + if (absl::SimpleAtoi(data, &duration_in_milliseconds)) { + return std::make_unique(duration_in_milliseconds); + } + + return nullptr; + } +}; + +REGISTER_FACTORY(PerConnectionIdleTimeoutMsObjectFactory, StreamInfo::FilterState::ObjectFactory); + Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name) : parent_(parent), cluster_name_(cluster_name) {} @@ -755,7 +772,7 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { conn_data->connection().state() != Network::Connection::State::Closed) { config_->drainManager().add(config_->sharedConfig(), std::move(conn_data), std::move(upstream_callbacks_), std::move(idle_timer_), - read_callbacks_->upstreamHost()); + idle_timeout_, read_callbacks_->upstreamHost()); } if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { @@ -829,7 +846,15 @@ void Filter::onUpstreamConnection() { read_callbacks_->connection(), getStreamInfo().downstreamAddressProvider().requestedServerName()); - if (config_->idleTimeout()) { + idle_timeout_ = config_->idleTimeout(); + if (const auto* per_connection_idle_timeout = + getStreamInfo().filterState()->getDataReadOnly( + PerConnectionIdleTimeoutMs); + per_connection_idle_timeout != nullptr) { + idle_timeout_ = std::chrono::milliseconds(per_connection_idle_timeout->value()); + } + + if (idle_timeout_) { // The idle_timer_ can be moved to a Drainer, so related callbacks call into // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch // the call to either TcpProxy or to Drainer, depending on the current state. @@ -905,8 +930,8 @@ void Filter::disableAccessLogFlushTimer() { void Filter::resetIdleTimer() { if (idle_timer_ != nullptr) { - ASSERT(config_->idleTimeout()); - idle_timer_->enableTimer(config_->idleTimeout().value()); + ASSERT(idle_timeout_); + idle_timer_->enableTimer(idle_timeout_.value()); } } @@ -943,9 +968,10 @@ void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config, Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, const std::shared_ptr& callbacks, Event::TimerPtr&& idle_timer, + absl::optional idle_timeout, const Upstream::HostDescriptionConstSharedPtr& upstream_host) { DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data), - std::move(idle_timer), upstream_host)); + std::move(idle_timer), idle_timeout, upstream_host)); callbacks->drain(*drainer); // Use temporary to ensure we get the pointer before we move it out of drainer @@ -963,9 +989,11 @@ void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatche Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, + absl::optional idle_timeout, const Upstream::HostDescriptionConstSharedPtr& upstream_host) : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)), - timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) { + idle_timer_(std::move(idle_timer)), idle_timeout_(idle_timeout), + upstream_host_(upstream_host), config_(config) { ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection()); config_->stats().upstream_flush_total_.inc(); config_->stats().upstream_flush_active_.inc(); @@ -974,8 +1002,8 @@ Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedP void Drainer::onEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { - if (timer_ != nullptr) { - timer_->disableTimer(); + if (idle_timer_ != nullptr) { + idle_timer_->disableTimer(); } config_->stats().upstream_flush_active_.dec(); parent_.remove(*this, upstream_conn_data_->connection().dispatcher()); @@ -998,8 +1026,8 @@ void Drainer::onIdleTimeout() { } void Drainer::onBytesSent() { - if (timer_ != nullptr) { - timer_->enableTimer(config_->idleTimeout().value()); + if (idle_timer_ != nullptr) { + idle_timer_->enableTimer(idle_timeout_.value()); } } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 82ebcb8fb9d9..a7b5e491191d 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -38,6 +38,9 @@ namespace Envoy { namespace TcpProxy { +constexpr absl::string_view PerConnectionIdleTimeoutMs = + "envoy.tcp_proxy.per_connection_idle_timeout_ms"; + /** * All tcp proxy stats. @see stats_macros.h */ @@ -547,6 +550,7 @@ class Filter : public Network::ReadFilter, Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; Network::TransportSocketOptionsConstSharedPtr transport_socket_options_; Network::Socket::OptionsSharedPtr upstream_options_; + absl::optional idle_timeout_; uint32_t connect_attempts_{}; bool connecting_{}; bool downstream_closed_{}; @@ -560,6 +564,7 @@ class Drainer : public Event::DeferredDeletable, protected Logger::Loggable& callbacks, Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, + absl::optional idle_timeout, const Upstream::HostDescriptionConstSharedPtr& upstream_host); void onEvent(Network::ConnectionEvent event); @@ -573,7 +578,8 @@ class Drainer : public Event::DeferredDeletable, protected Logger::Loggable callbacks_; Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; - Event::TimerPtr timer_; + Event::TimerPtr idle_timer_; + absl::optional idle_timeout_; Upstream::HostDescriptionConstSharedPtr upstream_host_; Config::SharedConfigSharedPtr config_; }; @@ -586,7 +592,7 @@ class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject { void add(const Config::SharedConfigSharedPtr& config, Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, const std::shared_ptr& callbacks, - Event::TimerPtr&& idle_timer, + Event::TimerPtr&& idle_timer, absl::optional idle_timeout, const Upstream::HostDescriptionConstSharedPtr& upstream_host); void remove(Drainer& drainer, Event::Dispatcher& dispatcher); diff --git a/test/common/stream_info/uint64_accessor_impl_test.cc b/test/common/stream_info/uint64_accessor_impl_test.cc index 8f143429339e..876700191938 100644 --- a/test/common/stream_info/uint64_accessor_impl_test.cc +++ b/test/common/stream_info/uint64_accessor_impl_test.cc @@ -31,6 +31,14 @@ TEST(UInt64AccessorImplTest, TestProto) { EXPECT_EQ(init_value, uint64_struct->value()); } +TEST(UInt64AccessorImplTest, TestString) { + uint64_t init_value = 0xdeadbeefdeadbeef; + UInt64AccessorImpl accessor(init_value); + absl::optional value = accessor.serializeAsString(); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value, std::to_string(init_value)); +} + } // namespace } // namespace StreamInfo } // namespace Envoy diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 41a2ce805c99..32376a18b31d 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -21,6 +21,7 @@ #include "source/common/network/upstream_socket_options_filter_state.h" #include "source/common/network/win32_redirect_records_option_impl.h" #include "source/common/router/metadatamatchcriteria_impl.h" +#include "source/common/stream_info/uint64_accessor_impl.h" #include "source/common/tcp_proxy/tcp_proxy.h" #include "source/common/upstream/upstream_impl.h" @@ -815,6 +816,65 @@ TEST_F(TcpProxyTest, UpstreamConnectionLimit) { EXPECT_EQ(access_log_data_, "UO"); } +TEST_F(TcpProxyTest, IdleTimeoutObjectFactory) { + const std::string name = "envoy.tcp_proxy.per_connection_idle_timeout_ms"; + auto* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(nullptr, factory); + EXPECT_EQ(name, factory->name()); + const std::string duration_in_milliseconds = std::to_string(1234); + auto object = factory->createFromBytes(duration_in_milliseconds); + ASSERT_NE(nullptr, object); + EXPECT_EQ(duration_in_milliseconds, object->serializeAsString()); +} + +TEST_F(TcpProxyTest, InvalidIdleTimeoutObjectFactory) { + const std::string name = "envoy.tcp_proxy.per_connection_idle_timeout_ms"; + auto* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(nullptr, factory); + EXPECT_EQ(name, factory->name()); + ASSERT_EQ(nullptr, factory->createFromBytes("not_a_number")); +} + +TEST_F(TcpProxyTest, IdleTimeoutWithFilterStateOverride) { + envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = defaultConfig(); + config.mutable_idle_timeout()->set_seconds(1); + setup(1, config); + + uint64_t idle_timeout_override = 5000; + + // Although the configured idle timeout is 1 second, overriding the value through filter state + // so the expected idle timeout is 5 seconds instead. + filter_callbacks_.connection_.streamInfo().filterState()->setData( + TcpProxy::PerConnectionIdleTimeoutMs, + std::make_unique(idle_timeout_override), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection); + + Event::MockTimer* idle_timer = new Event::MockTimer(&filter_callbacks_.connection_.dispatcher_); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _)); + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _)); + filter_->onData(buffer, false); + + buffer.add("hello2"); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _)); + upstream_callbacks_->onUpstreamData(buffer, false); + + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _)); + filter_callbacks_.connection_.raiseBytesSentCallbacks(1); + + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(idle_timeout_override), _)); + upstream_connections_.at(0)->raiseBytesSentCallbacks(2); + + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush, _)); + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush, _)); + EXPECT_CALL(*idle_timer, disableTimer()); + idle_timer->invokeCallback(); +} + // Tests that the idle timer closes both connections, and gets updated when either // connection has activity. TEST_F(TcpProxyTest, IdleTimeout) {