Skip to content

Commit

Permalink
udp_session_filters: add API for session filters and apply in UDP pro…
Browse files Browse the repository at this point in the history
…xy (envoyproxy#29366)

Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Sep 7, 2023
1 parent e6ee1cf commit 83a7d93
Show file tree
Hide file tree
Showing 15 changed files with 749 additions and 67 deletions.
18 changes: 17 additions & 1 deletion api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package envoy.extensions.filters.udp.udp_proxy.v3;
import "envoy/config/accesslog/v3/accesslog.proto";
import "envoy/config/core/v3/udp_socket_config.proto";

import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";

import "xds/annotations/v3/status.proto";
Expand All @@ -26,7 +27,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.filters.udp_listener.udp_proxy]

// Configuration for the UDP proxy filter.
// [#next-free-field: 11]
// [#next-free-field: 12]
message UdpProxyConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig";
Expand All @@ -49,6 +50,18 @@ message UdpProxyConfig {
}
}

// Configuration for UDP session filters.
message SessionFilter {
// The name of the filter configuration.
string name = 1 [(validate.rules).string = {min_len: 1}];

oneof config_type {
// Filter specific configuration which depends on the filter being
// instantiated. See the supported filters for further documentation.
google.protobuf.Any typed_config = 2;
}
}

// The stat prefix used when emitting UDP proxy filter stats.
string stat_prefix = 1 [(validate.rules).string = {min_len: 1}];

Expand Down Expand Up @@ -110,4 +123,7 @@ message UdpProxyConfig {

// Configuration for proxy access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog proxy_access_log = 10;

// Optional session filters that will run for each UDP session.
repeated SessionFilter session_filters = 11;
}
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ new_features:
change: |
added :ref:`custom_sink <envoy_v3_api_field_config.tap.v3.OutputSink.custom_sink>` type to enable writing tap data
out to a custom sink extension.
- area: udp_proxy
change: |
added :ref:`session_filters <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.session_filters>` config to
support optional filters that will run for each upstream UDP session. More information can be found in the UDP proxy documentation.
- area: otlp_stats_sink
change: |
added :ref:` stats prefix option<envoy_v3_api_field_extensions.stat_sinks.open_telemetry.v3.SinkConfig.stats_prefix>`
Expand Down
8 changes: 8 additions & 0 deletions docs/root/configuration/listeners/udp_filters/udp_proxy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ remaining datagrams to different clusters according to their source ports.
:lines: 14-53
:caption: :download:`udp-proxy-router.yaml <_include/udp-proxy-router.yaml>`

Session filters
---------------

The UDP proxy is able to apply :ref:`session filters <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.session_filters>`.
These kinds of filters run seprately on each upstream UDP session and support a more granular API that allows running operations only
at the start of an upstream UDP session, when a UDP datagram is received from the downstream and when a UDP datagram is received from the
upstream, similar to network filters.

Example configuration
---------------------

Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/udp/udp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ envoy_cc_library(
"//source/common/access_log:access_log_lib",
"//source/common/api:os_sys_calls_lib",
"//source/common/common:empty_string",
"//source/common/common:linked_object",
"//source/common/common:random_generator_lib",
"//source/common/network:socket_lib",
"//source/common/network:socket_option_factory_lib",
"//source/common/network:utility_lib",
"//source/common/stream_info:stream_info_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/filters/udp/udp_proxy/router:router_lib",
"//source/extensions/filters/udp/udp_proxy/session_filters:filter_config_interface",
"//source/extensions/filters/udp/udp_proxy/session_filters:filter_interface",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/udp/udp_proxy/v3:pkg_cc_proto",
],
Expand Down
14 changes: 14 additions & 0 deletions source/extensions/filters/udp/udp_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ UdpProxyFilterConfigImpl::UdpProxyFilterConfigImpl(
if (!config.hash_policies().empty()) {
hash_policy_ = std::make_unique<HashPolicyImpl>(config.hash_policies());
}

for (const auto& filter : config.session_filters()) {
ENVOY_LOG(debug, " UDP session filter #{}", filter_factories_.size());
ENVOY_LOG(debug, " name: {}", filter.name());
ENVOY_LOG(debug, " config: {}",
MessageUtil::getJsonStringFromMessageOrError(
static_cast<const Protobuf::Message&>(filter.typed_config()), true));

auto& factory = Config::Utility::getAndCheckFactory<NamedUdpSessionFilterConfigFactory>(filter);
ProtobufTypes::MessagePtr message = Envoy::Config::Utility::translateToFactoryConfig(
filter, context.messageValidationVisitor(), factory);
FilterFactoryCb callback = factory.createFilterFactoryFromProto(*message, context);
filter_factories_.push_back(callback);
}
}

static Registry::RegisterFactory<UdpProxyFilterConfigFactory,
Expand Down
13 changes: 12 additions & 1 deletion source/extensions/filters/udp/udp_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ namespace Extensions {
namespace UdpFilters {
namespace UdpProxy {

class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig, Logger::Loggable<Logger::Id::config> {
class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
public FilterChainFactory,
Logger::Loggable<Logger::Id::config> {
public:
UdpProxyFilterConfigImpl(
Server::Configuration::ListenerFactoryContext& context,
Expand Down Expand Up @@ -42,6 +44,14 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig, Logger::Loggable<L
const std::vector<AccessLog::InstanceSharedPtr>& proxyAccessLogs() const override {
return proxy_access_logs_;
}
const FilterChainFactory& sessionFilterFactory() const override { return *this; };

// FilterChainFactory
void createFilterChain(FilterChainFactoryCallbacks& callbacks) const override {
for (const FilterFactoryCb& factory : filter_factories_) {
factory(callbacks);
}
};

private:
static UdpProxyDownstreamStats generateStats(const std::string& stat_prefix,
Expand All @@ -63,6 +73,7 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig, Logger::Loggable<L
std::vector<AccessLog::InstanceSharedPtr> session_access_logs_;
std::vector<AccessLog::InstanceSharedPtr> proxy_access_logs_;
Random::RandomGenerator& random_;
std::list<SessionFilters::FilterFactoryCb> filter_factories_;
};

/**
Expand Down
18 changes: 14 additions & 4 deletions source/extensions/filters/udp/udp_proxy/session_filters/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@ class FilterCallbacks {
virtual StreamInfo::StreamInfo& streamInfo() PURE;
};

class ReadFilterCallbacks : public FilterCallbacks {};
class ReadFilterCallbacks : public FilterCallbacks {
public:
~ReadFilterCallbacks() override = default;

/**
* If a read filter stopped filter iteration, continueFilterChain() can be called to continue the
* filter chain. It will have onNewSession() called if it was not previously called.
*/
virtual void continueFilterChain() PURE;
};

class WriteFilterCallbacks : public FilterCallbacks {};

/**
Expand All @@ -50,7 +60,7 @@ class ReadFilter {

/**
* Called when a new UDP session is first established. Filters should do one time long term
* processing that needs to be done when a connection is established. Filter chain iteration
* processing that needs to be done when a session is established. Filter chain iteration
* can be stopped if needed.
* @return status used by the filter manager to manage further filter iteration.
*/
Expand All @@ -68,7 +78,7 @@ class ReadFilter {
* called by the filter manager a single time when the filter is first registered.
*
* IMPORTANT: No outbound networking or complex processing should be done in this function.
* That should be done in the context of onNewConnection() if needed.
* That should be done in the context of onNewSession() if needed.
*
* @param callbacks supplies the callbacks.
*/
Expand Down Expand Up @@ -106,7 +116,7 @@ class WriteFilter {
* called by the filter manager a single time when the filter is first registered.
*
* IMPORTANT: No outbound networking or complex processing should be done in this function.
* That should be done in the context of ReadFilter::onNewConnection() if needed.
* That should be done in the context of ReadFilter::onNewSession() if needed.
*
* @param callbacks supplies the callbacks.
*/
Expand Down
122 changes: 98 additions & 24 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHos
const Upstream::HostConstSharedPtr& host) {
ASSERT(host);
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
new_session->createFilterChain();
new_session->onNewSession();
auto new_session_ptr = new_session.get();
sessions_.emplace(std::move(new_session));
host_to_sessions_[host.get()].emplace(new_session_ptr);
Expand Down Expand Up @@ -202,7 +204,7 @@ Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::
}
}

active_session->write(*data.buffer_);
active_session->onData(data);

return Network::FilterStatus::StopIteration;
}
Expand Down Expand Up @@ -238,11 +240,13 @@ UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData&
active_session->host().address()->asStringView());
}

active_session->write(*data.buffer_);
active_session->onData(data);

return Network::FilterStatus::StopIteration;
}

std::atomic<uint64_t> UdpProxyFilter::ActiveSession::next_global_session_id_;

UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host)
Expand All @@ -254,7 +258,8 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
// is bound until the first packet is sent to the upstream host.
socket_(cluster.filter_.createSocket(host)),
udp_session_info_(
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)) {
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)),
session_id_(next_global_session_id_++) {

socket_->ioHandle().initializeFileEvent(
cluster.filter_.read_callbacks_->udpListener().dispatcher(),
Expand Down Expand Up @@ -375,13 +380,30 @@ void UdpProxyFilter::ActiveSession::onReadReady() {
cluster_.filter_.read_callbacks_->udpListener().flush();
}

void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) {
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
buffer.length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
const uint64_t buffer_length = buffer.length();
cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(buffer_length);
session_stats_.downstream_sess_rx_bytes_ += buffer_length;
void UdpProxyFilter::ActiveSession::onNewSession() {
for (auto& active_read_filter : read_filters_) {
if (active_read_filter->initialized_) {
// The filter may call continueFilterChain() in onNewSession(), causing next
// filters to iterate onNewSession(), so check that it was not called before.
continue;
}

active_read_filter->initialized_ = true;
auto status = active_read_filter->read_filter_->onNewSession();
if (status == ReadFilterStatus::StopIteration) {
return;
}
}
}

void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
ENVOY_LOG(trace, "received {} byte datagram from downstream: downstream={} local={} upstream={}",
data.buffer_->length(), addresses_.peer_->asStringView(),
addresses_.local_->asStringView(), host_->address()->asStringView());

const uint64_t rx_buffer_length = data.buffer_->length();
cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(rx_buffer_length);
session_stats_.downstream_sess_rx_bytes_ += rx_buffer_length;
cluster_.filter_.config_->stats().downstream_sess_rx_datagrams_.inc();
++session_stats_.downstream_sess_rx_datagrams_;

Expand All @@ -395,7 +417,6 @@ void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) {
// NOTE: We do not specify the local IP to use for the sendmsg call if use_original_src_ip_ is not
// set. We allow the OS to select the right IP based on outbound routing rules if
// use_original_src_ip_ is not set, else use downstream peer IP as local IP.
const Network::Address::Ip* local_ip = use_original_src_ip_ ? addresses_.peer_->ip() : nullptr;
if (!use_original_src_ip_ && !connected_) {
Api::SysCallIntResult rc = socket_->ioHandle().connect(host_->address());
if (SOCKET_FAILURE(rc.return_value_)) {
Expand All @@ -406,35 +427,88 @@ void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) {

connected_ = true;
}
Api::IoCallUint64Result rc =
Network::Utility::writeToSocket(socket_->ioHandle(), buffer, local_ip, *host_->address());

for (auto& active_read_filter : read_filters_) {
auto status = active_read_filter->read_filter_->onData(data);
if (status == ReadFilterStatus::StopIteration) {
return;
}
}

writeUpstream(data);
}

void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
ASSERT(connected_ || use_original_src_ip_);

const uint64_t tx_buffer_length = data.buffer_->length();
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
tx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());

const Network::Address::Ip* local_ip = use_original_src_ip_ ? addresses_.peer_->ip() : nullptr;
Api::IoCallUint64Result rc = Network::Utility::writeToSocket(socket_->ioHandle(), *data.buffer_,
local_ip, *host_->address());

if (!rc.ok()) {
cluster_.cluster_stats_.sess_tx_errors_.inc();
} else {
cluster_.cluster_stats_.sess_tx_datagrams_.inc();
cluster_.cluster_.info()->trafficStats()->upstream_cx_tx_bytes_total_.add(buffer_length);
cluster_.cluster_.info()->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length);
}
}

void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceConstSharedPtr,
Network::Address::InstanceConstSharedPtr,
Buffer::InstancePtr buffer, MonotonicTime) {
ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}",
buffer->length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
void UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filter) {
ASSERT(filter != nullptr);

std::list<ActiveReadFilterPtr>::iterator entry = std::next(filter->entry());
for (; entry != read_filters_.end(); entry++) {
if (!(*entry)->read_filter_ || (*entry)->initialized_) {
continue;
}

(*entry)->initialized_ = true;
auto status = (*entry)->read_filter_->onNewSession();
if (status == ReadFilterStatus::StopIteration) {
break;
}
}
}

void UdpProxyFilter::ActiveSession::processPacket(
Network::Address::InstanceConstSharedPtr local_address,
Network::Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer,
MonotonicTime receive_time) {
const uint64_t rx_buffer_length = buffer->length();
ENVOY_LOG(trace, "received {} byte datagram from upstream: downstream={} local={} upstream={}",
rx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
const uint64_t buffer_length = buffer->length();

cluster_.cluster_stats_.sess_rx_datagrams_.inc();
cluster_.cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(buffer_length);
cluster_.cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);

Network::UdpRecvData recv_data{
{std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time};
for (auto& active_write_filter : write_filters_) {
auto status = active_write_filter->write_filter_->onWrite(recv_data);
if (status == WriteFilterStatus::StopIteration) {
return;
}
}

const uint64_t tx_buffer_length = recv_data.buffer_->length();
ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}",
tx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());

Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer};
Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *recv_data.buffer_};
const Api::IoCallUint64Result rc = cluster_.filter_.read_callbacks_->udpListener().send(data);
if (!rc.ok()) {
cluster_.filter_.config_->stats().downstream_sess_tx_errors_.inc();
++session_stats_.downstream_sess_tx_errors_;
} else {
cluster_.filter_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length);
session_stats_.downstream_sess_tx_bytes_ += buffer_length;
cluster_.filter_.config_->stats().downstream_sess_tx_bytes_.add(tx_buffer_length);
session_stats_.downstream_sess_tx_bytes_ += tx_buffer_length;
cluster_.filter_.config_->stats().downstream_sess_tx_datagrams_.inc();
++session_stats_.downstream_sess_tx_datagrams_;
}
Expand Down
Loading

0 comments on commit 83a7d93

Please sign in to comment.