Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add packet batch trigger for better packet handling #60

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/quic/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ namespace oxen::quic
// called when a connection closes or times out before the handshake completes
using connection_closed_callback = std::function<void(connection_interface& conn, uint64_t ec)>;

// Called after we are done reading currently-available UDP packets to allow batch processing of
// incoming data (or any other just-before-potentially-blocking needed handling). First we fire
// off any available callbacks triggered for incoming packets, then just before we go back to
// potentially block waiting for more packets, we fire this to let the application know that
// there might not be more callbacks immediately arriving and so it should process what it has.
using post_receive_callback = std::function<void()>;

class Endpoint : std::enable_shared_from_this<Endpoint>
{
private:
Expand All @@ -50,6 +57,7 @@ namespace oxen::quic
void handle_ep_opt(opt::inbound_alpns alpns);
void handle_ep_opt(opt::handshake_timeout timeout);
void handle_ep_opt(dgram_data_callback dgram_cb);
void handle_ep_opt(post_receive_callback post_recv_cb);
void handle_ep_opt(connection_established_callback conn_established_cb);
void handle_ep_opt(connection_closed_callback conn_closed_cb);

Expand Down Expand Up @@ -238,6 +246,8 @@ namespace oxen::quic
bool _packet_splitting{false};
Splitting _policy{Splitting::NONE};

post_receive_callback _post_receive{nullptr};

std::shared_ptr<IOContext> outbound_ctx;
std::shared_ptr<IOContext> inbound_ctx;

Expand Down
20 changes: 18 additions & 2 deletions include/quic/udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ namespace oxen::quic
;

using receive_callback_t = std::function<void(const Packet& pkt)>;
using post_receive_callback_t = std::function<void()>;

UDPSocket() = delete;

/// Constructs a UDP socket bound to the given address. Throws if binding fails. If
/// binding to an any address (or any port) you can retrieve the realized address via
/// address() after construction.
///
/// When packets are received they will be fed into the given callback.
/// When packets are received they will be fed into the given `on_receive` callback.
///
/// The optional `post_receive` callback will be invoked after processing available incoming
/// packets but before returning to polling the socket for additional incoming packets.
/// This is meant to allow the caller to bundle incoming packets into batches without
/// introducing delays: each time one or more packets are read from the socket there will be
/// a sequence of `on_receive(...)` calls for each packet, followed by a `post_receive()`
/// call immediately before the socket returns to waiting for additional packets. Thus a
/// caller can use the `on_receive` callback to collect packets and the `post_receive`
/// callback to process the collected packets all at once.
///
/// ev_loop must outlive this object.
UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t cb);
UDPSocket(
event_base* ev_loop,
const Address& addr,
receive_callback_t on_receive,
post_receive_callback_t post_receive = nullptr);

/// Non-copyable and non-moveable
UDPSocket(const UDPSocket& s) = delete;
Expand Down Expand Up @@ -103,6 +117,8 @@ namespace oxen::quic

event_ptr rev_ = nullptr;
receive_callback_t receive_callback_;
post_receive_callback_t post_receive_;
bool have_received_ = false;
event_ptr wev_ = nullptr;
std::vector<std::function<void()>> writeable_callbacks_;
};
Expand Down
1 change: 1 addition & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ namespace oxen::quic
if (is_draining())
{
log::debug(log_cat, "Note: connection is already draining; dropping");
return;
}

if (read_packet(pkt).success())
Expand Down
16 changes: 14 additions & 2 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ namespace oxen::quic
dgram_recv_cb = std::move(func);
}

void Endpoint::handle_ep_opt(post_receive_callback func)
{
log::trace(log_cat, "Endpoint given post-receive callback");
_post_receive = std::move(func);
}

void Endpoint::handle_ep_opt(connection_established_callback conn_established_cb)
{
log::trace(log_cat, "Endpoint given connection established callback");
Expand All @@ -71,8 +77,14 @@ namespace oxen::quic
void Endpoint::_init_internals()
{
log::debug(log_cat, "Starting new UDP socket on {}", _local);
socket =
std::make_unique<UDPSocket>(get_loop().get(), _local, [this](const auto& packet) { handle_packet(packet); });
socket = std::make_unique<UDPSocket>(
get_loop().get(),
_local,
[this](const auto& packet) { handle_packet(packet); },
[this] {
if (_post_receive)
_post_receive();
});

_local = socket->address();

Expand Down
17 changes: 14 additions & 3 deletions src/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ namespace oxen::quic
}
#endif

UDPSocket::UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t on_receive) :
ev_{ev_loop}, receive_callback_{std::move(on_receive)}
UDPSocket::UDPSocket(
event_base* ev_loop, const Address& addr, receive_callback_t on_receive, post_receive_callback_t post_receive) :
ev_{ev_loop}, receive_callback_{std::move(on_receive)}, post_receive_{std::move(post_receive)}
{
assert(ev_);

Expand Down Expand Up @@ -125,7 +126,16 @@ namespace oxen::quic
ev_,
sock_,
EV_READ | EV_PERSIST,
[](evutil_socket_t, short, void* self) { static_cast<UDPSocket*>(self)->receive(); },
[](evutil_socket_t, short, void* self_) {
auto& self = *static_cast<UDPSocket*>(self_);
self.receive();
if (self.have_received_)
{
self.have_received_ = false;
if (self.post_receive_)
self.post_receive_();
}
},
this));
event_add(rev_.get(), nullptr);

Expand Down Expand Up @@ -190,6 +200,7 @@ namespace oxen::quic
return;
}

have_received_ = true;
receive_callback_(Packet{bound_, payload, hdr});
}

Expand Down
81 changes: 80 additions & 1 deletion tests/007-datagrams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ namespace oxen::quic::test
good_msg += v++;

for (int i = 0; i < n; ++i)
conn_interface->send_datagram(std::basic_string_view<uint8_t>{good_msg});
conn_interface->send_datagram(good_msg);

for (auto& f : data_futures)
REQUIRE(f.get());
Expand Down Expand Up @@ -638,4 +638,83 @@ namespace oxen::quic::test
};
#endif
};

TEST_CASE("007 - Datagram support: packet post-receive triggers", "[007][datagrams][packet-post-receive]")
{
auto client_established = callback_waiter{[](connection_interface&) {}};

Network test_net{};

std::basic_string<std::byte> big_msg{};

for (int v = 0; big_msg.size() < 1000; v++)
big_msg += static_cast<std::byte>(v % 256);

std::atomic<int> recv_counter{0};
std::atomic<int> data_counter{0};

std::promise<void> got_first, acked_first;
std::promise<int> got_all_n_recvs;

dgram_data_callback recv_dgram_cb = [&](dgram_interface&, bstring value) {
auto count = ++data_counter;
CHECK(value == big_msg);
if (count == 1)
{
// We get one datagram, then stall the quic thread so that the test can fire
// multiple packets that we should then receive in one go.
got_first.set_value();
REQUIRE(acked_first.get_future().wait_for(1s) == std::future_status::ready);
}
else if (count == 31)
{
got_all_n_recvs.set_value(recv_counter);
}
};

auto recv_notifier = [&] { recv_counter++; };

opt::local_addr server_local{};
opt::local_addr client_local{};

auto server_tls = GNUTLSCreds::make("./serverkey.pem"s, "./servercert.pem"s, "./clientcert.pem"s);
auto client_tls = GNUTLSCreds::make("./clientkey.pem"s, "./clientcert.pem"s, "./servercert.pem"s);

auto server_endpoint = test_net.endpoint(server_local, recv_dgram_cb, recv_notifier, opt::enable_datagrams{});
REQUIRE_NOTHROW(server_endpoint->listen(server_tls));

opt::remote_addr client_remote{"127.0.0.1"s, server_endpoint->local().port()};

auto client = test_net.endpoint(client_local, client_established, opt::enable_datagrams{});
auto conn = client->connect(client_remote, client_tls);

REQUIRE(client_established.wait());

// Start off with *one* datagram; the first one the server receives will stall the
// server until we signal it via the acked_first promise, during which we'll send a
// bunch more that ought to be processed in a single batch.
conn->send_datagram(big_msg);

REQUIRE(got_first.get_future().wait_for(1s) == std::future_status::ready);

int batches_before_flood = recv_counter;

for (int i = 0; i < 30; i++)
conn->send_datagram(big_msg);

acked_first.set_value();

auto f = got_all_n_recvs.get_future();
REQUIRE(f.wait_for(1s) == std::future_status::ready);
auto recv_counter_before_final = f.get();
REQUIRE(data_counter == 31);
REQUIRE(recv_counter_before_final > batches_before_flood);
// There should be a recv callback fired *immediately* after the data callback that
// fulfilled the above proimise, so a miniscule wait here should guarantee that it has been
// set.
std::this_thread::sleep_for(1ms);
auto final_recv_counter = recv_counter.load();
REQUIRE(final_recv_counter > recv_counter_before_final);
};

} // namespace oxen::quic::test