Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Stop Read/Write #127

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/oxen/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ namespace oxen::quic
int stream_ack(int64_t id, size_t size);
int stream_receive(int64_t id, bstring_view data, bool fin);
void stream_execute_close(Stream& s, uint64_t app_code);
void stream_reset(int64_t id, uint64_t app_code);
void stream_closed(int64_t id, uint64_t app_code);
void close_all_streams();
void check_pending_streams(uint64_t available);
Expand Down
6 changes: 6 additions & 0 deletions include/oxen/quic/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ namespace oxen::quic
// Application error code we close with if the stream data handle throws
inline constexpr uint64_t STREAM_ERROR_EXCEPTION = ERROR_BASE + 100;

// Application error code for signalling a remote shut down stream reading
inline constexpr uint64_t STREAM_REMOTE_READ_SHUTDOWN = ERROR_BASE + 101;

// Application error code for signalling a remote shut down stream writing
inline constexpr uint64_t STREAM_REMOTE_WRITE_SHUTDOWN = ERROR_BASE + 102;

// Application error if a bt request stream handle throws an exception
inline constexpr uint64_t BPARSER_ERROR_EXCEPTION = ERROR_BASE + 105;

Expand Down
37 changes: 37 additions & 0 deletions include/oxen/quic/opt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,42 @@ namespace oxen::quic
_hook = nullptr;
}
};

// Used to provide callbacks for remote stream reset. Application can pass one or both callbacks to indicate what
// logic should be executed when the remote shuts down stream reading or writing. The signature of `on_reset_hook_t`
// matches that of other hooks, so we wrap it in an opt struct to differentiate and to structure access.
struct remote_stream_reset
{
using on_reset_hook_t = std::function<void(Stream&, uint64_t)>;

private:
on_reset_hook_t _on_read_reset = nullptr;
on_reset_hook_t _on_write_reset = nullptr;

public:
remote_stream_reset() = default;

explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) :
_on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)}
{
if (not _on_read_reset and not _on_write_reset)
throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"};
}

explicit operator bool() const { return has_read_hook() and has_write_hook(); }

void clear()
{
_on_read_reset = nullptr;
_on_write_reset = nullptr;
}

bool has_read_hook() const { return _on_read_reset != nullptr; }
bool has_write_hook() const { return _on_write_reset != nullptr; }

void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); }
void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); }
};

} // namespace opt
} // namespace oxen::quic
27 changes: 27 additions & 0 deletions include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,26 @@ namespace oxen::quic

bool is_paused() const;

/** Remote Stream Reset:
- Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream
shuts down reading and/or writing
- This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of
the hooks themselves!
*/
void set_remote_reset_hooks(opt::remote_stream_reset hooks);

void clear_remote_reset_hooks();

bool has_remote_reset_hooks() const;

void stop_reading();

void stop_writing();

bool is_reading() const;

bool is_writing() const;

// These public methods are synchronized so that they can be safely called from outside the
// libquic main loop thread.
bool available() const;
Expand Down Expand Up @@ -160,6 +180,13 @@ namespace oxen::quic
opt::watermark _high_water;
opt::watermark _low_water;

opt::remote_stream_reset _remote_reset;

bool _in_reset{false};

bool _is_reading{true};
bool _is_writing{true};

void wrote(size_t bytes) override;

void append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive);
Expand Down
47 changes: 46 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace oxen::quic
void* /*stream_user_data*/)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
static_cast<Connection*>(user_data)->stream_closed(stream_id, app_error_code);
static_cast<Connection*>(user_data)->stream_reset(stream_id, app_error_code);
return 0;
}

Expand Down Expand Up @@ -1171,6 +1171,51 @@ namespace oxen::quic
}
}

void Connection::stream_reset(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
auto it = _streams.find(id);

if (it == _streams.end())
return;

auto& stream = it->second;

switch (app_code)
{
case STREAM_REMOTE_READ_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id);

if (stream->_remote_reset.has_read_hook())
{
log::debug(log_cat, "Invoking remote_read_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.read_reset(*stream.get(), app_code);
stream->_in_reset = false;
}

break;

case STREAM_REMOTE_WRITE_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id);

if (stream->_remote_reset.has_write_hook())
{
log::debug(log_cat, "Invoking remote_write_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.write_reset(*stream.get(), app_code);
stream->_in_reset = false;
}

break;

default:
log::critical(
log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code);
}
}

void Connection::stream_closed(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
Expand Down
142 changes: 124 additions & 18 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,94 @@ namespace oxen::quic
});
}

void Stream::set_remote_reset_hooks(opt::remote_stream_reset sr)
{
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
endpoint.call([this, hooks = std::move(sr)]() {
if (_in_reset)
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};

log::debug(log_cat, "Stream (ID:{}) provided `remote_stream_reset` hooks!", _stream_id);
_remote_reset = std::move(hooks);
});
}

void Stream::clear_remote_reset_hooks()
{
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
endpoint.call([this]() {
if (_in_reset)
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};

log::debug(log_cat, "Stream (ID:{}) cleared `remote_stream_reset` hooks!", _stream_id);
_remote_reset.clear();
assert(not _remote_reset);
});
}

bool Stream::has_remote_reset_hooks() const
{
return endpoint.call_get([this]() { return _remote_reset.has_read_hook() and _remote_reset.has_write_hook(); });
}

void Stream::stop_reading()
{
endpoint.call([this]() {
if (not _is_reading)
{
log::warning(log_cat, "Stream has already halted read operations!");
return;
}

_is_reading = false;

log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN);
});
}

void Stream::stop_writing()
{
endpoint.call([this]() {
if (not _is_writing)
{
log::warning(log_cat, "Stream has already halted write operations!");
return;
}

if (user_buffers.empty())
{
log::warning(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
}

// if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to
// signal for the same call in ::acknowledge()
_is_writing = false;
dr7ana marked this conversation as resolved.
Show resolved Hide resolved
});
}

bool Stream::is_paused() const
{
return endpoint.call_get([this]() { return _paused; });
}

bool Stream::is_reading() const
{
return endpoint.call_get([this]() { return _is_reading; });
}

bool Stream::is_writing() const
{
return endpoint.call_get([this]() { return _is_writing; });
}

bool Stream::available() const
{
return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); });
Expand Down Expand Up @@ -209,13 +292,42 @@ namespace oxen::quic
void Stream::append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);

if (not _is_writing)
{
log::warning(log_cat, "Stream (ID:{}) has halted writing; payload NOT appended to buffer!", _stream_id);
return;
}

user_buffers.emplace_back(buffer, std::move(keep_alive));
assert(endpoint.in_event_loop());
assert(_conn);
if (_ready)
_conn->packet_io_ready();
else
log::info(log_cat, "Stream not ready for broadcast yet, data appended to buffer and on deck");

if (_is_watermarked)
{
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (auto unsent = size() - _unacked_size; unsent >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
_high_primed = false;
_high_water(*this);
}
}

// Low/high watermarks were executed and self-cleared, so clean up
if (not _high_water and not _low_water)
return clear_watermarks();
}
}

void Stream::acknowledge(size_t bytes)
Expand All @@ -238,30 +350,24 @@ namespace oxen::quic
if (bytes)
user_buffers.front().first.remove_prefix(bytes);

if (not _is_writing and user_buffers.empty())
{
log::warning(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
}

auto sz = size();

// Do not bother with this block of logic if no watermarks are set
if (_is_watermarked)
{
auto unsent = sz - _unacked_size;

// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (unsent >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
_high_primed = false;
return _high_water(*this);
}
}
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
// watermark. If the low water hook exists and is primed, execute it
else if (unsent <= _low_mark)
if (auto unsent = sz - _unacked_size; unsent <= _low_mark)
{
_high_primed = true;
log::info(log_cat, "High water hook primed!");
Expand All @@ -270,7 +376,7 @@ namespace oxen::quic
{
log::info(log_cat, "Executing low watermark hook!");
_low_primed = false;
return _low_water(*this);
_low_water(*this);
}
}

Expand Down
Loading