From 259df3f877fffaa4fab37d3cf388305c74a898b2 Mon Sep 17 00:00:00 2001 From: ohadvano Date: Thu, 7 Sep 2023 19:54:32 +0300 Subject: [PATCH] add callback to inject read/write datagrams to filter chain Signed-off-by: ohadvano --- .../udp_proxy/session_filters/buffer_filter.h | 10 +- .../udp_proxy/udp_proxy_integration_test.cc | 122 +++++++++++++++++- 2 files changed, 124 insertions(+), 8 deletions(-) diff --git a/test/extensions/filters/udp/udp_proxy/session_filters/buffer_filter.h b/test/extensions/filters/udp/udp_proxy/session_filters/buffer_filter.h index e34f431b4e16..8ace63cf1a96 100644 --- a/test/extensions/filters/udp/udp_proxy/session_filters/buffer_filter.h +++ b/test/extensions/filters/udp/udp_proxy/session_filters/buffer_filter.h @@ -23,7 +23,7 @@ using BufferingFilterConfig = using BufferedDatagramPtr = std::unique_ptr; -class BufferingSessionFilter : public Filter { +class BufferingSessionFilter : public Filter { public: BufferingSessionFilter(int downstream_datagrams_to_buffer, int upstream_datagrams_to_buffer, bool continue_after_inject) @@ -39,7 +39,9 @@ class BufferingSessionFilter : public Filter { write_callbacks_ = &callbacks; } - ReadFilterStatus onNewSession() override { return ReadFilterStatus::Continue; } + ReadFilterStatus onNewSession() override { + return ReadFilterStatus::Continue; + } ReadFilterStatus onData(Network::UdpRecvData& data) override { if (downstream_buffer_.size() < downstream_datagrams_to_buffer_) { @@ -87,7 +89,7 @@ class BufferingSessionFilter : public Filter { void bufferRead(Network::UdpRecvData& data) { auto buffered_datagram = std::make_unique(); buffered_datagram->addresses_ = {std::move(data.addresses_.local_), - std::move(data.addresses_.peer_)}; + std::move(data.addresses_.peer_)}; buffered_datagram->buffer_ = std::move(data.buffer_); buffered_datagram->receive_time_ = data.receive_time_; downstream_buffer_.push(std::move(buffered_datagram)); @@ -96,7 +98,7 @@ class BufferingSessionFilter : public Filter { void bufferWrite(Network::UdpRecvData& data) { auto buffered_datagram = std::make_unique(); buffered_datagram->addresses_ = {std::move(data.addresses_.local_), - std::move(data.addresses_.peer_)}; + std::move(data.addresses_.peer_)}; buffered_datagram->buffer_ = std::move(data.buffer_); buffered_datagram->receive_time_ = data.receive_time_; upstream_buffer_.push(std::move(buffered_datagram)); diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc index 366dd28db1a3..8caf2bd76d73 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -194,8 +194,8 @@ name: udp_proxy )EOF"; for (auto config : session_filters_configs) { - session_filters += fmt::format( - R"EOF( + session_filters += fmt::format( + R"EOF( - name: foo typed_config: '@type': type.googleapis.com/test.extensions.filters.udp.udp_proxy.session_filters.BufferingFilterConfig @@ -203,8 +203,8 @@ name: udp_proxy upstream_datagrams_to_buffer: {} continue_after_inject: {} )EOF", - config.downstream_datagrams_to_buffer_, config.upstream_datagrams_to_buffer_, - config.continue_after_inject_); + config.downstream_datagrams_to_buffer_, config.upstream_datagrams_to_buffer_, + config.continue_after_inject_); } return session_filters; @@ -772,5 +772,119 @@ TEST_P(UdpProxyIntegrationTest, TwoBufferingFilters) { EXPECT_EQ(2, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); } +TEST_P(UdpProxyIntegrationTest, BufferingFilterBasicFlow) { + setup(1, absl::nullopt, getBufferSessionFilterConfig({{2, 2, true}})); + const uint32_t port = lookupPort("listener_0"); + const auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + + Network::Test::UdpSyncPeer client(version_, Network::DEFAULT_UDP_MAX_DATAGRAM_SIZE); + client.write("hello1", *listener_address); + client.write("hello2", *listener_address); + + // Two downstream datagrams should be received, but none sent upstream due to filter buffering. + test_server_->waitForCounterEq("udp.foo.downstream_sess_rx_datagrams", 2); + EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + // Third downstream datagram should flush the previously buffered datagrams, due to + // injectDatagramToFilterChain() call. + client.write("hello3", *listener_address); + + // Wait for the upstream datagram. + Network::UdpRecvData request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello1", request_datagram.buffer_->toString()); + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello2", request_datagram.buffer_->toString()); + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello3", request_datagram.buffer_->toString()); + EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); + EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + // Two upstream datagrams should be received, but none sent downstream due to filter buffering. + fake_upstreams_[0]->sendUdpDatagram("response1", request_datagram.addresses_.peer_); + fake_upstreams_[0]->sendUdpDatagram("response2", request_datagram.addresses_.peer_); + test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_rx_datagrams", 2); + EXPECT_EQ(0, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); + + // Third upstream datagram should flush the previously buffered datagrams, due to + // injectDatagramToFilterChain() call. + fake_upstreams_[0]->sendUdpDatagram("response3", request_datagram.addresses_.peer_); + + Network::UdpRecvData response_datagram; + client.recv(response_datagram); + EXPECT_EQ("response1", response_datagram.buffer_->toString()); + client.recv(response_datagram); + EXPECT_EQ("response2", response_datagram.buffer_->toString()); + client.recv(response_datagram); + EXPECT_EQ("response3", response_datagram.buffer_->toString()); + EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value()); + EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); +} + +TEST_P(UdpProxyIntegrationTest, TwoBufferingFilters) { + setup(1, absl::nullopt, getBufferSessionFilterConfig({{1, 1, false}, {1, 1, false}})); + const uint32_t port = lookupPort("listener_0"); + const auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + + Network::Test::UdpSyncPeer client(version_, Network::DEFAULT_UDP_MAX_DATAGRAM_SIZE); + client.write("hello1", *listener_address); // Buffered in the first filter. + // 'hello1' will proceed to second filter. 'hello2' will buffer in first filter. + client.write("hello2", *listener_address); + + // Two downstream datagrams should be received, but none sent upstream due to filter buffering. + test_server_->waitForCounterEq("udp.foo.downstream_sess_rx_datagrams", 2); + EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + // 'hello1' will flush upstream, 'hello2' will proceed to second filter. 'hello3' will + // buffer in the first filter. + client.write("hello3", *listener_address); + + // Wait for the upstream datagram. + Network::UdpRecvData request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello1", request_datagram.buffer_->toString()); + EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); + EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + // 'hello2' will flush upstream, 'hello3' will proceed to second filter. 'hello4' will + // buffer in the first filter. + client.write("hello4", *listener_address); + + // Wait for the upstream datagram. + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello2", request_datagram.buffer_->toString()); + EXPECT_EQ(4, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); + EXPECT_EQ(2, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + // Testing the upstream to downstream direction. + // Two upstream datagrams should be received, but none sent downstream due to filter buffering. + fake_upstreams_[0]->sendUdpDatagram("response1", request_datagram.addresses_.peer_); + // 'response1' will proceed to second filter. 'response2' will buffer in first filter. + fake_upstreams_[0]->sendUdpDatagram("response2", request_datagram.addresses_.peer_); + test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_rx_datagrams", 2); + EXPECT_EQ(0, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); + + // 'response1' will flush downstream, 'response2' will proceed to second filter. 'response3' will + // buffer in the first filter. + fake_upstreams_[0]->sendUdpDatagram("response3", request_datagram.addresses_.peer_); + + // Wait for the downstream datagram. + Network::UdpRecvData response_datagram; + client.recv(response_datagram); + EXPECT_EQ("response1", response_datagram.buffer_->toString()); + EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value()); + EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); + + // 'response2' will flush downstream, 'response3' will proceed to second filter. 'response4' will + // buffer in the first filter. + fake_upstreams_[0]->sendUdpDatagram("response4", request_datagram.addresses_.peer_); + client.recv(response_datagram); + EXPECT_EQ("response2", response_datagram.buffer_->toString()); + EXPECT_EQ(4, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value()); + EXPECT_EQ(2, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); +} + } // namespace } // namespace Envoy