Skip to content

Commit

Permalink
add callback to inject read/write datagrams to filter chain
Browse files Browse the repository at this point in the history
Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano committed Sep 10, 2023
1 parent 766e33a commit 259df3f
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using BufferingFilterConfig =

using BufferedDatagramPtr = std::unique_ptr<Network::UdpRecvData>;

class BufferingSessionFilter : public Filter {
class BufferingSessionFilter : public Filter {
public:
BufferingSessionFilter(int downstream_datagrams_to_buffer, int upstream_datagrams_to_buffer,
bool continue_after_inject)
Expand All @@ -39,7 +39,9 @@ class BufferingSessionFilter : public Filter {
write_callbacks_ = &callbacks;
}

ReadFilterStatus onNewSession() override { return ReadFilterStatus::Continue; }
ReadFilterStatus onNewSession() override {
return ReadFilterStatus::Continue;
}

ReadFilterStatus onData(Network::UdpRecvData& data) override {
if (downstream_buffer_.size() < downstream_datagrams_to_buffer_) {
Expand Down Expand Up @@ -87,7 +89,7 @@ class BufferingSessionFilter : public Filter {
void bufferRead(Network::UdpRecvData& data) {
auto buffered_datagram = std::make_unique<Network::UdpRecvData>();
buffered_datagram->addresses_ = {std::move(data.addresses_.local_),
std::move(data.addresses_.peer_)};
std::move(data.addresses_.peer_)};
buffered_datagram->buffer_ = std::move(data.buffer_);
buffered_datagram->receive_time_ = data.receive_time_;
downstream_buffer_.push(std::move(buffered_datagram));
Expand All @@ -96,7 +98,7 @@ class BufferingSessionFilter : public Filter {
void bufferWrite(Network::UdpRecvData& data) {
auto buffered_datagram = std::make_unique<Network::UdpRecvData>();
buffered_datagram->addresses_ = {std::move(data.addresses_.local_),
std::move(data.addresses_.peer_)};
std::move(data.addresses_.peer_)};
buffered_datagram->buffer_ = std::move(data.buffer_);
buffered_datagram->receive_time_ = data.receive_time_;
upstream_buffer_.push(std::move(buffered_datagram));
Expand Down
122 changes: 118 additions & 4 deletions test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,17 @@ name: udp_proxy
)EOF";

for (auto config : session_filters_configs) {
session_filters += fmt::format(
R"EOF(
session_filters += fmt::format(
R"EOF(
- name: foo
typed_config:
'@type': type.googleapis.com/test.extensions.filters.udp.udp_proxy.session_filters.BufferingFilterConfig
downstream_datagrams_to_buffer: {}
upstream_datagrams_to_buffer: {}
continue_after_inject: {}
)EOF",
config.downstream_datagrams_to_buffer_, config.upstream_datagrams_to_buffer_,
config.continue_after_inject_);
config.downstream_datagrams_to_buffer_, config.upstream_datagrams_to_buffer_,
config.continue_after_inject_);
}

return session_filters;
Expand Down Expand Up @@ -772,5 +772,119 @@ TEST_P(UdpProxyIntegrationTest, TwoBufferingFilters) {
EXPECT_EQ(2, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value());
}

TEST_P(UdpProxyIntegrationTest, BufferingFilterBasicFlow) {
setup(1, absl::nullopt, getBufferSessionFilterConfig({{2, 2, true}}));
const uint32_t port = lookupPort("listener_0");
const auto listener_address = Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port));

Network::Test::UdpSyncPeer client(version_, Network::DEFAULT_UDP_MAX_DATAGRAM_SIZE);
client.write("hello1", *listener_address);
client.write("hello2", *listener_address);

// Two downstream datagrams should be received, but none sent upstream due to filter buffering.
test_server_->waitForCounterEq("udp.foo.downstream_sess_rx_datagrams", 2);
EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value());

// Third downstream datagram should flush the previously buffered datagrams, due to
// injectDatagramToFilterChain() call.
client.write("hello3", *listener_address);

// Wait for the upstream datagram.
Network::UdpRecvData request_datagram;
ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram));
EXPECT_EQ("hello1", request_datagram.buffer_->toString());
ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram));
EXPECT_EQ("hello2", request_datagram.buffer_->toString());
ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram));
EXPECT_EQ("hello3", request_datagram.buffer_->toString());
EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value());
EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value());

// Two upstream datagrams should be received, but none sent downstream due to filter buffering.
fake_upstreams_[0]->sendUdpDatagram("response1", request_datagram.addresses_.peer_);
fake_upstreams_[0]->sendUdpDatagram("response2", request_datagram.addresses_.peer_);
test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_rx_datagrams", 2);
EXPECT_EQ(0, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value());

// Third upstream datagram should flush the previously buffered datagrams, due to
// injectDatagramToFilterChain() call.
fake_upstreams_[0]->sendUdpDatagram("response3", request_datagram.addresses_.peer_);

Network::UdpRecvData response_datagram;
client.recv(response_datagram);
EXPECT_EQ("response1", response_datagram.buffer_->toString());
client.recv(response_datagram);
EXPECT_EQ("response2", response_datagram.buffer_->toString());
client.recv(response_datagram);
EXPECT_EQ("response3", response_datagram.buffer_->toString());
EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value());
EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value());
}

TEST_P(UdpProxyIntegrationTest, TwoBufferingFilters) {
setup(1, absl::nullopt, getBufferSessionFilterConfig({{1, 1, false}, {1, 1, false}}));
const uint32_t port = lookupPort("listener_0");
const auto listener_address = Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port));

Network::Test::UdpSyncPeer client(version_, Network::DEFAULT_UDP_MAX_DATAGRAM_SIZE);
client.write("hello1", *listener_address); // Buffered in the first filter.
// 'hello1' will proceed to second filter. 'hello2' will buffer in first filter.
client.write("hello2", *listener_address);

// Two downstream datagrams should be received, but none sent upstream due to filter buffering.
test_server_->waitForCounterEq("udp.foo.downstream_sess_rx_datagrams", 2);
EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value());

// 'hello1' will flush upstream, 'hello2' will proceed to second filter. 'hello3' will
// buffer in the first filter.
client.write("hello3", *listener_address);

// Wait for the upstream datagram.
Network::UdpRecvData request_datagram;
ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram));
EXPECT_EQ("hello1", request_datagram.buffer_->toString());
EXPECT_EQ(3, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value());
EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value());

// 'hello2' will flush upstream, 'hello3' will proceed to second filter. 'hello4' will
// buffer in the first filter.
client.write("hello4", *listener_address);

// Wait for the upstream datagram.
ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram));
EXPECT_EQ("hello2", request_datagram.buffer_->toString());
EXPECT_EQ(4, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value());
EXPECT_EQ(2, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value());

// Testing the upstream to downstream direction.
// Two upstream datagrams should be received, but none sent downstream due to filter buffering.
fake_upstreams_[0]->sendUdpDatagram("response1", request_datagram.addresses_.peer_);
// 'response1' will proceed to second filter. 'response2' will buffer in first filter.
fake_upstreams_[0]->sendUdpDatagram("response2", request_datagram.addresses_.peer_);
test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_rx_datagrams", 2);
EXPECT_EQ(0, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value());

// 'response1' will flush downstream, 'response2' will proceed to second filter. 'response3' will
// buffer in the first filter.
fake_upstreams_[0]->sendUdpDatagram("response3", request_datagram.addresses_.peer_);

// Wait for the downstream datagram.
Network::UdpRecvData response_datagram;
client.recv(response_datagram);
EXPECT_EQ("response1", response_datagram.buffer_->toString());
EXPECT_EQ(3, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value());
EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value());

// 'response2' will flush downstream, 'response3' will proceed to second filter. 'response4' will
// buffer in the first filter.
fake_upstreams_[0]->sendUdpDatagram("response4", request_datagram.addresses_.peer_);
client.recv(response_datagram);
EXPECT_EQ("response2", response_datagram.buffer_->toString());
EXPECT_EQ(4, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value());
EXPECT_EQ(2, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value());
}

} // namespace
} // namespace Envoy

0 comments on commit 259df3f

Please sign in to comment.