Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Stop Read/Write #127

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ namespace oxen::quic

bool is_paused() const;

bool is_reading() const;

bool is_writing() const;

void stop_reading();

void stop_writing();

// These public methods are synchronized so that they can be safely called from outside the
// libquic main loop thread.
bool available() const;
Expand Down Expand Up @@ -160,6 +168,9 @@ namespace oxen::quic
opt::watermark _high_water;
opt::watermark _low_water;

bool _is_reading{true};
bool _is_writing{true};

void wrote(size_t bytes) override;

void append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive);
Expand Down
88 changes: 70 additions & 18 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,50 @@ namespace oxen::quic
});
}

void Stream::stop_reading()
{
endpoint.call([this]() {
if (not _is_reading)
{
log::warning(log_cat, "Stream has already halted read operations!");
return;
}

_is_reading = false;

log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0);
});
}

void Stream::stop_writing()
{
endpoint.call([this]() {
if (not _is_writing)
{
log::warning(log_cat, "Stream has already halted write operations!");
return;
}

_is_writing = false;
dr7ana marked this conversation as resolved.
Show resolved Hide resolved
});
}

bool Stream::is_paused() const
{
return endpoint.call_get([this]() { return _paused; });
}

bool Stream::is_reading() const
{
return endpoint.call_get([this]() { return _is_reading; });
}

bool Stream::is_writing() const
{
return endpoint.call_get([this]() { return _is_writing; });
}

bool Stream::available() const
{
return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); });
Expand Down Expand Up @@ -216,6 +255,28 @@ namespace oxen::quic
_conn->packet_io_ready();
else
log::info(log_cat, "Stream not ready for broadcast yet, data appended to buffer and on deck");

if (_is_watermarked)
{
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (auto unsent = size() - _unacked_size; unsent >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
_high_primed = false;
_high_water(*this);
}
}

// Low/high watermarks were executed and self-cleared, so clean up
if (not _high_water and not _low_water)
return clear_watermarks();
}
}

void Stream::acknowledge(size_t bytes)
Expand All @@ -240,28 +301,19 @@ namespace oxen::quic

auto sz = size();

if (not _is_writing and _unacked_size == 0)
dr7ana marked this conversation as resolved.
Show resolved Hide resolved
{
log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0);
return clear_watermarks();
}

// Do not bother with this block of logic if no watermarks are set
if (_is_watermarked)
{
auto unsent = sz - _unacked_size;

// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (unsent >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
_high_primed = false;
return _high_water(*this);
}
}
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
// watermark. If the low water hook exists and is primed, execute it
else if (unsent <= _low_mark)
if (auto unsent = sz - _unacked_size; unsent <= _low_mark)
{
_high_primed = true;
log::info(log_cat, "High water hook primed!");
Expand All @@ -270,7 +322,7 @@ namespace oxen::quic
{
log::info(log_cat, "Executing low watermark hook!");
_low_primed = false;
return _low_water(*this);
_low_water(*this);
}
}

Expand Down
64 changes: 63 additions & 1 deletion tests/012-watermarks.cpp → tests/012-stream_signalling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace oxen::quic::test
{
TEST_CASE("012 - Stream Buffer Watermarking", "[012][watermark][streams]")
TEST_CASE("012 - Stream Signalling: Buffer Watermarking", "[012][signalling][watermark][streams]")
{
Network test_net{};
bstring req_msg(100'000, std::byte{'a'});
Expand Down Expand Up @@ -145,4 +145,66 @@ namespace oxen::quic::test
REQUIRE_FALSE(client_stream->has_watermarks());
}
}

TEST_CASE("012 - Stream Signalling: Stop Read/Write", "[012][signalling][readwrite][streams]")
{
Network test_net{};
bstring req_msg(1'000, std::byte{'a'});

auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys();

Address server_local{};
Address client_local{};

std::shared_ptr<Stream> server_stream;

auto client_established = callback_waiter{[](connection_interface&) {}};
auto server_established = callback_waiter{[&](connection_interface& ci) {
server_stream = ci.queue_incoming_stream();
server_stream->send(bstring_view{req_msg});
}};

auto server_endpoint = test_net.endpoint(server_local, server_established);
REQUIRE_NOTHROW(server_endpoint->listen(server_tls));

RemoteAddress client_remote{defaults::SERVER_PUBKEY, "127.0.0.1"s, server_endpoint->local().port()};

auto client_endpoint = test_net.endpoint(client_local, client_established);
auto conn_interface = client_endpoint->connect(client_remote, client_tls);

CHECK(client_established.wait());
CHECK(server_established.wait());

auto p = std::promise<bool>();
auto f = p.get_future();

auto client_stream = conn_interface->open_stream<Stream>([&](Stream&, bstring_view) { p.set_value(true); });

REQUIRE(client_stream->is_reading());
REQUIRE(client_stream->is_writing());

SECTION("Stop Writing")
{
server_stream->stop_writing();
REQUIRE_FALSE(server_stream->is_writing());

client_stream->send(bstring_view{req_msg});
REQUIRE(f.get());
dr7ana marked this conversation as resolved.
Show resolved Hide resolved

// allow the acks to get back to the client; extra time for slow CI archs
std::this_thread::sleep_for(250ms);

dr7ana marked this conversation as resolved.
Show resolved Hide resolved
REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0);
}

SECTION("Stop Reading")
{
client_stream->stop_reading();
REQUIRE_FALSE(client_stream->is_reading());

client_stream->send(bstring_view{req_msg});

REQUIRE(f.wait_for(1s) == std::future_status::timeout);
}
}
} // namespace oxen::quic::test
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if(LIBQUIC_BUILD_TESTS)
009-alpns.cpp
010-migration.cpp
011-manual_transmission.cpp
012-watermarks.cpp
012-stream_signalling.cpp

main.cpp
case_logger.cpp
Expand Down
5 changes: 5 additions & 0 deletions tests/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ namespace oxen::quic
ep._next_rid += by;
}

size_t TestHelper::stream_unacked(Stream& s)
{
return s._unacked_size;
}

std::pair<std::shared_ptr<GNUTLSCreds>, std::shared_ptr<GNUTLSCreds>> test::defaults::tls_creds_from_ed_keys()
{
auto client = GNUTLSCreds::make_from_ed_keys(CLIENT_SEED, CLIENT_PUBKEY);
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace oxen::quic
static int disable_dgram_flip_flop(connection_interface& conn);
static int get_dgram_debug_counter(connection_interface& conn);

static size_t stream_unacked(Stream& s);

// Bumps the connection's next reference id to make it easier to tell which connection is
// which in log output.
static void increment_ref_id(Endpoint& ep, uint64_t by = 1);
Expand Down