From 2b080e0828db137c777165935bb96bd9565dac58 Mon Sep 17 00:00:00 2001 From: dr7ana Date: Tue, 7 May 2024 13:40:23 -0700 Subject: [PATCH 1/2] stream stop read/write --- include/oxen/quic/stream.hpp | 11 +++ src/stream.cpp | 88 +++++++++++++++---- ...termarks.cpp => 012-stream_signalling.cpp} | 64 +++++++++++++- tests/CMakeLists.txt | 2 +- tests/utils.cpp | 5 ++ tests/utils.hpp | 2 + 6 files changed, 152 insertions(+), 20 deletions(-) rename tests/{012-watermarks.cpp => 012-stream_signalling.cpp} (69%) diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 4519ef4f..41318d8e 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -91,6 +91,14 @@ namespace oxen::quic bool is_paused() const; + bool is_reading() const; + + bool is_writing() const; + + void stop_reading(); + + void stop_writing(); + // These public methods are synchronized so that they can be safely called from outside the // libquic main loop thread. bool available() const; @@ -160,6 +168,9 @@ namespace oxen::quic opt::watermark _high_water; opt::watermark _low_water; + bool _is_reading{true}; + bool _is_writing{true}; + void wrote(size_t bytes) override; void append_buffer(bstring_view buffer, std::shared_ptr keep_alive); diff --git a/src/stream.cpp b/src/stream.cpp index 6c5e1580..f3d720a6 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -127,11 +127,50 @@ namespace oxen::quic }); } + void Stream::stop_reading() + { + endpoint.call([this]() { + if (not _is_reading) + { + log::warning(log_cat, "Stream has already halted read operations!"); + return; + } + + _is_reading = false; + + log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id); + ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0); + }); + } + + void Stream::stop_writing() + { + endpoint.call([this]() { + if (not _is_writing) + { + log::warning(log_cat, "Stream has already halted write operations!"); + return; + } + + _is_writing = false; + }); + } + bool Stream::is_paused() const { return endpoint.call_get([this]() { return _paused; }); } + bool Stream::is_reading() const + { + return endpoint.call_get([this]() { return _is_reading; }); + } + + bool Stream::is_writing() const + { + return endpoint.call_get([this]() { return _is_writing; }); + } + bool Stream::available() const { return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); }); @@ -216,6 +255,28 @@ namespace oxen::quic _conn->packet_io_ready(); else log::info(log_cat, "Stream not ready for broadcast yet, data appended to buffer and on deck"); + + if (_is_watermarked) + { + // We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low + // watermark. If the high water hook exists and is primed, execute it + if (auto unsent = size() - _unacked_size; unsent >= _high_mark) + { + _low_primed = true; + log::info(log_cat, "Low water hook primed!"); + + if (_high_water and _high_primed) + { + log::info(log_cat, "Executing high watermark hook!"); + _high_primed = false; + _high_water(*this); + } + } + + // Low/high watermarks were executed and self-cleared, so clean up + if (not _high_water and not _low_water) + return clear_watermarks(); + } } void Stream::acknowledge(size_t bytes) @@ -240,28 +301,19 @@ namespace oxen::quic auto sz = size(); + if (not _is_writing and _unacked_size == 0) + { + log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id); + ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0); + return clear_watermarks(); + } + // Do not bother with this block of logic if no watermarks are set if (_is_watermarked) { - auto unsent = sz - _unacked_size; - - // We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low - // watermark. If the high water hook exists and is primed, execute it - if (unsent >= _high_mark) - { - _low_primed = true; - log::info(log_cat, "Low water hook primed!"); - - if (_high_water and _high_primed) - { - log::info(log_cat, "Executing high watermark hook!"); - _high_primed = false; - return _high_water(*this); - } - } // We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high // watermark. If the low water hook exists and is primed, execute it - else if (unsent <= _low_mark) + if (auto unsent = sz - _unacked_size; unsent <= _low_mark) { _high_primed = true; log::info(log_cat, "High water hook primed!"); @@ -270,7 +322,7 @@ namespace oxen::quic { log::info(log_cat, "Executing low watermark hook!"); _low_primed = false; - return _low_water(*this); + _low_water(*this); } } diff --git a/tests/012-watermarks.cpp b/tests/012-stream_signalling.cpp similarity index 69% rename from tests/012-watermarks.cpp rename to tests/012-stream_signalling.cpp index f1672a65..b0192953 100644 --- a/tests/012-watermarks.cpp +++ b/tests/012-stream_signalling.cpp @@ -7,7 +7,7 @@ namespace oxen::quic::test { - TEST_CASE("012 - Stream Buffer Watermarking", "[012][watermark][streams]") + TEST_CASE("012 - Stream Signalling: Buffer Watermarking", "[012][signalling][watermark][streams]") { Network test_net{}; bstring req_msg(100'000, std::byte{'a'}); @@ -145,4 +145,66 @@ namespace oxen::quic::test REQUIRE_FALSE(client_stream->has_watermarks()); } } + + TEST_CASE("012 - Stream Signalling: Stop Read/Write", "[012][signalling][readwrite][streams]") + { + Network test_net{}; + bstring req_msg(1'000, std::byte{'a'}); + + auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys(); + + Address server_local{}; + Address client_local{}; + + std::shared_ptr server_stream; + + auto client_established = callback_waiter{[](connection_interface&) {}}; + auto server_established = callback_waiter{[&](connection_interface& ci) { + server_stream = ci.queue_incoming_stream(); + server_stream->send(bstring_view{req_msg}); + }}; + + auto server_endpoint = test_net.endpoint(server_local, server_established); + REQUIRE_NOTHROW(server_endpoint->listen(server_tls)); + + RemoteAddress client_remote{defaults::SERVER_PUBKEY, "127.0.0.1"s, server_endpoint->local().port()}; + + auto client_endpoint = test_net.endpoint(client_local, client_established); + auto conn_interface = client_endpoint->connect(client_remote, client_tls); + + CHECK(client_established.wait()); + CHECK(server_established.wait()); + + auto p = std::promise(); + auto f = p.get_future(); + + auto client_stream = conn_interface->open_stream([&](Stream&, bstring_view) { p.set_value(true); }); + + REQUIRE(client_stream->is_reading()); + REQUIRE(client_stream->is_writing()); + + SECTION("Stop Writing") + { + server_stream->stop_writing(); + REQUIRE_FALSE(server_stream->is_writing()); + + client_stream->send(bstring_view{req_msg}); + REQUIRE(f.get()); + + // allow the acks to get back to the client; extra time for slow CI archs + std::this_thread::sleep_for(250ms); + + REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0); + } + + SECTION("Stop Reading") + { + client_stream->stop_reading(); + REQUIRE_FALSE(client_stream->is_reading()); + + client_stream->send(bstring_view{req_msg}); + + REQUIRE(f.wait_for(1s) == std::future_status::timeout); + } + } } // namespace oxen::quic::test diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d8664158..5fb85b7c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,7 +38,7 @@ if(LIBQUIC_BUILD_TESTS) 009-alpns.cpp 010-migration.cpp 011-manual_transmission.cpp - 012-watermarks.cpp + 012-stream_signalling.cpp main.cpp case_logger.cpp diff --git a/tests/utils.cpp b/tests/utils.cpp index d59cb360..a5f05d75 100644 --- a/tests/utils.cpp +++ b/tests/utils.cpp @@ -112,6 +112,11 @@ namespace oxen::quic ep._next_rid += by; } + size_t TestHelper::stream_unacked(Stream& s) + { + return s._unacked_size; + } + std::pair, std::shared_ptr> test::defaults::tls_creds_from_ed_keys() { auto client = GNUTLSCreds::make_from_ed_keys(CLIENT_SEED, CLIENT_PUBKEY); diff --git a/tests/utils.hpp b/tests/utils.hpp index a2ef647d..2f2be752 100644 --- a/tests/utils.hpp +++ b/tests/utils.hpp @@ -49,6 +49,8 @@ namespace oxen::quic static int disable_dgram_flip_flop(connection_interface& conn); static int get_dgram_debug_counter(connection_interface& conn); + static size_t stream_unacked(Stream& s); + // Bumps the connection's next reference id to make it easier to tell which connection is // which in log output. static void increment_ref_id(Endpoint& ep, uint64_t by = 1); From 1f2428adaa7acc5f654b172e10e8ed65a25fcd21 Mon Sep 17 00:00:00 2001 From: dr7ana Date: Wed, 8 May 2024 06:08:12 -0700 Subject: [PATCH 2/2] Callbacks on remote read/write shutdown - Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream shuts down reading and/or writing - This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of the hooks themselves! --- include/oxen/quic/connection.hpp | 1 + include/oxen/quic/error.hpp | 6 +++ include/oxen/quic/opt.hpp | 37 ++++++++++++++++++ include/oxen/quic/stream.hpp | 20 +++++++++- src/connection.cpp | 47 ++++++++++++++++++++++- src/stream.cpp | 66 +++++++++++++++++++++++++++++--- tests/012-stream_signalling.cpp | 19 ++++++++- 7 files changed, 186 insertions(+), 10 deletions(-) diff --git a/include/oxen/quic/connection.hpp b/include/oxen/quic/connection.hpp index a795da82..2a76adf5 100644 --- a/include/oxen/quic/connection.hpp +++ b/include/oxen/quic/connection.hpp @@ -490,6 +490,7 @@ namespace oxen::quic int stream_ack(int64_t id, size_t size); int stream_receive(int64_t id, bstring_view data, bool fin); void stream_execute_close(Stream& s, uint64_t app_code); + void stream_reset(int64_t id, uint64_t app_code); void stream_closed(int64_t id, uint64_t app_code); void close_all_streams(); void check_pending_streams(uint64_t available); diff --git a/include/oxen/quic/error.hpp b/include/oxen/quic/error.hpp index 90134ddc..8d625241 100644 --- a/include/oxen/quic/error.hpp +++ b/include/oxen/quic/error.hpp @@ -20,6 +20,12 @@ namespace oxen::quic // Application error code we close with if the stream data handle throws inline constexpr uint64_t STREAM_ERROR_EXCEPTION = ERROR_BASE + 100; + // Application error code for signalling a remote shut down stream reading + inline constexpr uint64_t STREAM_REMOTE_READ_SHUTDOWN = ERROR_BASE + 101; + + // Application error code for signalling a remote shut down stream writing + inline constexpr uint64_t STREAM_REMOTE_WRITE_SHUTDOWN = ERROR_BASE + 102; + // Application error if a bt request stream handle throws an exception inline constexpr uint64_t BPARSER_ERROR_EXCEPTION = ERROR_BASE + 105; diff --git a/include/oxen/quic/opt.hpp b/include/oxen/quic/opt.hpp index fd80cc1a..a6232e1e 100644 --- a/include/oxen/quic/opt.hpp +++ b/include/oxen/quic/opt.hpp @@ -202,5 +202,42 @@ namespace oxen::quic _hook = nullptr; } }; + + // Used to provide callbacks for remote stream reset. Application can pass one or both callbacks to indicate what + // logic should be executed when the remote shuts down stream reading or writing. The signature of `on_reset_hook_t` + // matches that of other hooks, so we wrap it in an opt struct to differentiate and to structure access. + struct remote_stream_reset + { + using on_reset_hook_t = std::function; + + private: + on_reset_hook_t _on_read_reset = nullptr; + on_reset_hook_t _on_write_reset = nullptr; + + public: + remote_stream_reset() = default; + + explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) : + _on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)} + { + if (not _on_read_reset and not _on_write_reset) + throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"}; + } + + explicit operator bool() const { return has_read_hook() and has_write_hook(); } + + void clear() + { + _on_read_reset = nullptr; + _on_write_reset = nullptr; + } + + bool has_read_hook() const { return _on_read_reset != nullptr; } + bool has_write_hook() const { return _on_write_reset != nullptr; } + + void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); } + void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); } + }; + } // namespace opt } // namespace oxen::quic diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 41318d8e..8afab2d2 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -91,14 +91,26 @@ namespace oxen::quic bool is_paused() const; - bool is_reading() const; + /** Remote Stream Reset: + - Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream + shuts down reading and/or writing + - This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of + the hooks themselves! + */ + void set_remote_reset_hooks(opt::remote_stream_reset hooks); - bool is_writing() const; + void clear_remote_reset_hooks(); + + bool has_remote_reset_hooks() const; void stop_reading(); void stop_writing(); + bool is_reading() const; + + bool is_writing() const; + // These public methods are synchronized so that they can be safely called from outside the // libquic main loop thread. bool available() const; @@ -168,6 +180,10 @@ namespace oxen::quic opt::watermark _high_water; opt::watermark _low_water; + opt::remote_stream_reset _remote_reset; + + bool _in_reset{false}; + bool _is_reading{true}; bool _is_writing{true}; diff --git a/src/connection.cpp b/src/connection.cpp index 9ec57d3b..10c532eb 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -124,7 +124,7 @@ namespace oxen::quic void* /*stream_user_data*/) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - static_cast(user_data)->stream_closed(stream_id, app_error_code); + static_cast(user_data)->stream_reset(stream_id, app_error_code); return 0; } @@ -1171,6 +1171,51 @@ namespace oxen::quic } } + void Connection::stream_reset(int64_t id, uint64_t app_code) + { + log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); + assert(ngtcp2_is_bidi_stream(id)); + auto it = _streams.find(id); + + if (it == _streams.end()) + return; + + auto& stream = it->second; + + switch (app_code) + { + case STREAM_REMOTE_READ_SHUTDOWN: + log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id); + + if (stream->_remote_reset.has_read_hook()) + { + log::debug(log_cat, "Invoking remote_read_reset hook..."); + stream->_in_reset = true; + stream->_remote_reset.read_reset(*stream.get(), app_code); + stream->_in_reset = false; + } + + break; + + case STREAM_REMOTE_WRITE_SHUTDOWN: + log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id); + + if (stream->_remote_reset.has_write_hook()) + { + log::debug(log_cat, "Invoking remote_write_reset hook..."); + stream->_in_reset = true; + stream->_remote_reset.write_reset(*stream.get(), app_code); + stream->_in_reset = false; + } + + break; + + default: + log::critical( + log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code); + } + } + void Connection::stream_closed(int64_t id, uint64_t app_code) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); diff --git a/src/stream.cpp b/src/stream.cpp index f3d720a6..f04c221f 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -127,6 +127,38 @@ namespace oxen::quic }); } + void Stream::set_remote_reset_hooks(opt::remote_stream_reset sr) + { + // we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream + // lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves + endpoint.call([this, hooks = std::move(sr)]() { + if (_in_reset) + throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"}; + + log::debug(log_cat, "Stream (ID:{}) provided `remote_stream_reset` hooks!", _stream_id); + _remote_reset = std::move(hooks); + }); + } + + void Stream::clear_remote_reset_hooks() + { + // we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream + // lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves + endpoint.call([this]() { + if (_in_reset) + throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"}; + + log::debug(log_cat, "Stream (ID:{}) cleared `remote_stream_reset` hooks!", _stream_id); + _remote_reset.clear(); + assert(not _remote_reset); + }); + } + + bool Stream::has_remote_reset_hooks() const + { + return endpoint.call_get([this]() { return _remote_reset.has_read_hook() and _remote_reset.has_write_hook(); }); + } + void Stream::stop_reading() { endpoint.call([this]() { @@ -139,7 +171,7 @@ namespace oxen::quic _is_reading = false; log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id); - ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0); + ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN); }); } @@ -152,6 +184,18 @@ namespace oxen::quic return; } + if (user_buffers.empty()) + { + log::warning( + log_cat, + "All transmitted data dispatched and acked; halting all write operations on stream ID:{}", + _stream_id); + ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN); + return clear_watermarks(); + } + + // if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to + // signal for the same call in ::acknowledge() _is_writing = false; }); } @@ -248,6 +292,13 @@ namespace oxen::quic void Stream::append_buffer(bstring_view buffer, std::shared_ptr keep_alive) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); + + if (not _is_writing) + { + log::warning(log_cat, "Stream (ID:{}) has halted writing; payload NOT appended to buffer!", _stream_id); + return; + } + user_buffers.emplace_back(buffer, std::move(keep_alive)); assert(endpoint.in_event_loop()); assert(_conn); @@ -299,15 +350,18 @@ namespace oxen::quic if (bytes) user_buffers.front().first.remove_prefix(bytes); - auto sz = size(); - - if (not _is_writing and _unacked_size == 0) + if (not _is_writing and user_buffers.empty()) { - log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id); - ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0); + log::warning( + log_cat, + "All transmitted data dispatched and acked; halting all write operations on stream ID:{}", + _stream_id); + ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN); return clear_watermarks(); } + auto sz = size(); + // Do not bother with this block of logic if no watermarks are set if (_is_watermarked) { diff --git a/tests/012-stream_signalling.cpp b/tests/012-stream_signalling.cpp index b0192953..771381af 100644 --- a/tests/012-stream_signalling.cpp +++ b/tests/012-stream_signalling.cpp @@ -180,6 +180,21 @@ namespace oxen::quic::test auto client_stream = conn_interface->open_stream([&](Stream&, bstring_view) { p.set_value(true); }); + client_stream->set_remote_reset_hooks(opt::remote_stream_reset{ + [](Stream& s, uint64_t ec) { + REQUIRE(ec == STREAM_REMOTE_READ_SHUTDOWN); + + // Cannot set or clear callbacks while executing the callbacks! + REQUIRE_THROWS(s.set_remote_reset_hooks(opt::remote_stream_reset{})); + REQUIRE_THROWS(s.clear_remote_reset_hooks()); + + s.stop_writing(); + }, + [](Stream& s, uint64_t ec) { + REQUIRE(ec == STREAM_REMOTE_WRITE_SHUTDOWN); + s.stop_reading(); + }}); + REQUIRE(client_stream->is_reading()); REQUIRE(client_stream->is_writing()); @@ -189,11 +204,13 @@ namespace oxen::quic::test REQUIRE_FALSE(server_stream->is_writing()); client_stream->send(bstring_view{req_msg}); - REQUIRE(f.get()); + require_future(f); // allow the acks to get back to the client; extra time for slow CI archs std::this_thread::sleep_for(250ms); + REQUIRE_FALSE(client_stream->is_reading()); + REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0); }