diff --git a/include/oxen/quic/btstream.hpp b/include/oxen/quic/btstream.hpp index 125cc8b7..2275f118 100644 --- a/include/oxen/quic/btstream.hpp +++ b/include/oxen/quic/btstream.hpp @@ -224,7 +224,10 @@ namespace oxen::quic auto req = std::make_shared(*this, encode_command(ep, rid, body), rid, std::forward(opts)...); if (req->cb) - endpoint.call([this, r = std::move(req)]() { send(sent_reqs.emplace_back(std::move(r))->view()); }); + endpoint.call([this, r = std::move(req)]() mutable { + auto& sr = sent_reqs.emplace_back(std::move(r)); + send(sr->view(), sr); + }); else send(std::move(*req).payload()); } diff --git a/include/oxen/quic/iochannel.hpp b/include/oxen/quic/iochannel.hpp index 87e12dcd..39999ee9 100644 --- a/include/oxen/quic/iochannel.hpp +++ b/include/oxen/quic/iochannel.hpp @@ -45,12 +45,17 @@ namespace oxen::quic Address local() const; Address remote() const; + // Sends a string_view of character/byte data. The keep_alive argument is used to manage + // ownership: it will be held until the data is completely sent and acked by the remote + // side. It can be nullptr to disable, but only if the caller is certain that the provided + // data will stay alive for the duration of the Stream's lifetime. template = 0> - void send(std::basic_string_view data, std::shared_ptr keep_alive = nullptr) + void send(std::basic_string_view data, std::shared_ptr keep_alive) { send_impl(convert_sv(data), std::move(keep_alive)); } + // Takes over ownership of a string of character/byte data and sends it into the stream. template void send(std::basic_string&& data) { @@ -59,6 +64,7 @@ namespace oxen::quic send(view, std::move(keep_alive)); } + // Takes over ownership of a vector of character/byte data and sends it into the stream. template = 0> void send(std::vector&& buf) { diff --git a/tests/002-send-receive.cpp b/tests/002-send-receive.cpp index 8cc8ce02..65e38e10 100644 --- a/tests/002-send-receive.cpp +++ b/tests/002-send-receive.cpp @@ -41,7 +41,7 @@ namespace oxen::quic::test // client make stream and send; message displayed by server_data_cb auto client_stream = conn_interface->open_stream(); - REQUIRE_NOTHROW(client_stream->send(good_msg)); + REQUIRE_NOTHROW(client_stream->send(good_msg, nullptr)); require_future(d_future); }; @@ -83,7 +83,7 @@ namespace oxen::quic::test auto server_ci = server_endpoint_b->connect(server_remote, server_tls); auto server_stream = server_ci->open_stream(); - server_stream->send(good_msg); + server_stream->send(good_msg, nullptr); require_future(d_futures[0]); @@ -93,7 +93,7 @@ namespace oxen::quic::test // client make stream and send; message displayed by server_data_cb auto client_stream = conn_interface->open_stream(); - REQUIRE_NOTHROW(client_stream->send(good_msg)); + REQUIRE_NOTHROW(client_stream->send(good_msg, nullptr)); require_future(d_futures[1]); }; @@ -134,7 +134,7 @@ namespace oxen::quic::test auto server_a_ci = server_endpoint_b->connect(server_remote_a, server_tls); auto server_a_stream = server_a_ci->open_stream(); - server_a_stream->send(good_msg); + server_a_stream->send(good_msg, nullptr); require_future(d_futures[0]); @@ -142,7 +142,7 @@ namespace oxen::quic::test auto server_b_stream = server_b_ci->open_stream(); - server_b_stream->send(good_msg); + server_b_stream->send(good_msg, nullptr); require_future(d_futures[1]); }; @@ -198,7 +198,7 @@ namespace oxen::quic::test { // There is no ownership issue here: we're just viewing into our `good_msg` which we // are keeping alive already for the duration of this test. - stream_to_a->send(bstring_view{good_msg}); + stream_to_a->send(bstring_view{good_msg}, nullptr); } } SECTION("Sending bstring buffer with transferred ownership") diff --git a/tests/003-multiclient.cpp b/tests/003-multiclient.cpp index a9cae606..da1c5204 100644 --- a/tests/003-multiclient.cpp +++ b/tests/003-multiclient.cpp @@ -82,8 +82,8 @@ namespace oxen::quic::test auto stream_b = c_interface_b->open_stream(); // send - stream_a->send(msg); - stream_b->send(msg); + stream_a->send(msg, nullptr); + stream_b->send(msg, nullptr); }}; std::thread async_thread_b{[&]() { @@ -102,8 +102,8 @@ namespace oxen::quic::test auto stream_d = c_interface_d->open_stream(); // send - stream_c->send(msg); - stream_d->send(msg); + stream_c->send(msg, nullptr); + stream_d->send(msg, nullptr); }}; for (auto& f : stream_futures) diff --git a/tests/004-streams.cpp b/tests/004-streams.cpp index ea9e02b8..b5633616 100644 --- a/tests/004-streams.cpp +++ b/tests/004-streams.cpp @@ -64,7 +64,7 @@ namespace oxen::quic::test auto conn_interface = client_endpoint->connect(client_remote, client_tls, max_streams); auto client_stream = conn_interface->open_stream(); - client_stream->send(msg); + client_stream->send(msg, nullptr); require_future(data_future); REQUIRE(conn_interface->get_streams_available() == max_streams.stream_count - 1); @@ -114,7 +114,7 @@ namespace oxen::quic::test REQUIRE(server_ci->get_max_streams() == server_config.stream_count); auto client_stream = client_ci->open_stream(); - client_stream->send(msg); + client_stream->send(msg, nullptr); require_future(data_future); @@ -180,7 +180,7 @@ namespace oxen::quic::test for (size_t i = 0; i < n_streams; ++i) { streams[i] = conn_interface->open_stream(); - streams[i]->send(msg); + streams[i]->send(msg, nullptr); send_promises[i].set_value(); } @@ -200,7 +200,7 @@ namespace oxen::quic::test for (int i = 0; i < 2; ++i) { streams[i] = conn_interface->open_stream(); - streams[i]->send(msg); + streams[i]->send(msg, nullptr); // set send promise send_promises[i + n_streams].set_value(); } @@ -268,14 +268,14 @@ namespace oxen::quic::test log::debug(log_cat, "Calling standard stream data callback... data received..."); REQUIRE(msg == dat); ss_p.set_value(); - s.send(msg); + s.send(msg, nullptr); }; stream_data_callback standard_client_cb = [&](Stream& s, bstring_view dat) { log::debug(log_cat, "Calling standard stream data callback... data received..."); REQUIRE(msg == dat); cs_p.set_value(); - s.send(msg); + s.send(msg, nullptr); }; auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys(); @@ -293,7 +293,7 @@ namespace oxen::quic::test auto client_stream = conn_interface->open_stream(std::move(cc_p)); - REQUIRE_NOTHROW(client_stream->send(msg)); + REQUIRE_NOTHROW(client_stream->send(msg, nullptr)); require_future(ss_f); require_future(cc_f); @@ -301,7 +301,7 @@ namespace oxen::quic::test auto server_ci = server_endpoint->get_all_conns(Direction::INBOUND).front(); auto server_stream = server_ci->open_stream(std::move(sc_p)); - REQUIRE_NOTHROW(server_stream->send(msg)); + REQUIRE_NOTHROW(server_stream->send(msg, nullptr)); require_future(cs_f); require_future(sc_f); @@ -338,7 +338,7 @@ namespace oxen::quic::test auto client_stream = conn_interface->open_stream(); - REQUIRE_NOTHROW(client_stream->send(msg)); + REQUIRE_NOTHROW(client_stream->send(msg, nullptr)); require_future(server_future); }; @@ -973,7 +973,7 @@ namespace oxen::quic::test auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys(); auto server_endpoint = test_net.endpoint(server_local); - server_endpoint->listen(server_tls, [&](Stream& s, bstring_view data) { s.send(data); }); + server_endpoint->listen(server_tls, [&](Stream& s, bstring_view data) { s.send(bstring{data}); }); RemoteAddress client_remote{defaults::SERVER_PUBKEY, "127.0.0.1"s, server_endpoint->local().port()}; auto client_endpoint = test_net.endpoint(client_local); diff --git a/tests/006-server-send.cpp b/tests/006-server-send.cpp index f319eae8..dee1c2b8 100644 --- a/tests/006-server-send.cpp +++ b/tests/006-server-send.cpp @@ -53,11 +53,11 @@ namespace oxen::quic::test auto conn_interface = client_endpoint->connect(client_remote, client_tls, client_io_data_cb); auto client_stream = conn_interface->open_stream(); - client_stream->send(msg); + client_stream->send(msg, nullptr); require_future(stream_future); - server_stream->send(msg); + server_stream->send(msg, nullptr); require_future(client_future); require_future(server_future); @@ -157,20 +157,20 @@ namespace oxen::quic::test auto client_ci = client_endpoint->connect(client_remote, client_tls, client_io_data_cb, client_io_open_cb); auto client_stream = client_ci->open_stream(); - client_stream->send(msg); + client_stream->send(msg, nullptr); require_future(server_futures[0]); require_future(server_futures[1]); - server_extracted_stream->send(response); + server_extracted_stream->send(response, nullptr); server_ci = server_endpoint->get_all_conns(Direction::INBOUND).front(); auto server_stream = server_ci->open_stream(); - server_stream->send(msg); + server_stream->send(msg, nullptr); for (auto& c : client_futures) require_future(c); - client_extracted_stream->send(response); + client_extracted_stream->send(response, nullptr); require_future(server_futures[2]); REQUIRE(data_check == 4); diff --git a/tests/010-migration.cpp b/tests/010-migration.cpp index fb69ef67..c027fad9 100644 --- a/tests/010-migration.cpp +++ b/tests/010-migration.cpp @@ -94,7 +94,7 @@ namespace oxen::quic::test auto client_stream = client_ci->open_stream(); - REQUIRE_NOTHROW(client_stream->send(good_msg)); + REQUIRE_NOTHROW(client_stream->send(good_msg, nullptr)); require_future(d_future); server_ci = server_endpoint->get_all_conns(Direction::INBOUND).front(); diff --git a/tests/speedtest-client.cpp b/tests/speedtest-client.cpp index 656c9c71..71e9d503 100644 --- a/tests/speedtest-client.cpp +++ b/tests/speedtest-client.cpp @@ -298,7 +298,7 @@ int main(int argc, char* argv[]) { s.remaining = 0; s.done_sending = true; - s.stream->send(bstring_view{s.bufs[0].data(), s.bufs[0].size()}); + s.stream->send(bstring_view{s.bufs[0].data(), s.bufs[0].size()}, nullptr); } else {