Skip to content

async_exec now uses a sans-io strategy #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 41 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4557269
Initial implementation
anarthal May 12, 2025
5e227f0
Bugfixing
anarthal May 12, 2025
0a7a0e1
adjust copyright of new files
anarthal May 14, 2025
84bde33
Made multiplexer use default-constructed error codes for elements
anarthal May 16, 2025
726fd92
Fix test warnings regarding signed to unsigned comparisons
anarthal May 16, 2025
cf473b7
Remove std::move inhibiting optimizations in test
anarthal May 16, 2025
6333363
test_exec_fsm (1)
anarthal May 16, 2025
6b8e773
memory de-allocation check
anarthal May 16, 2025
3a7142b
write in progress test
anarthal May 16, 2025
0e9020c
refactor
anarthal May 16, 2025
5fe3219
cancel if not connected
anarthal May 16, 2025
f518c7b
Remove leftover runner.hpp
anarthal May 16, 2025
936c517
writer notification is now unconditional
anarthal May 18, 2025
b4b5d32
test_not_connected
anarthal May 18, 2025
79b4c62
better handling of cancellations
anarthal May 18, 2025
21fd0c6
cancel waiting
anarthal May 18, 2025
a048647
cmake
anarthal May 18, 2025
98c2450
Merge branch 'develop' into feature/sansio-exec
anarthal May 20, 2025
8006ffb
Merge branch 'develop' into feature/sansio-exec
anarthal May 20, 2025
f494c1a
waiting, all cancel types
anarthal May 21, 2025
889c7e7
cancel terminal not waiting
anarthal May 21, 2025
053e464
Finish cancellation tests
anarthal May 21, 2025
cf06d26
refactor
anarthal May 21, 2025
7977f97
Add test for cancelling pending requests
anarthal May 21, 2025
fe0e662
extend test to all cancel types
anarthal May 21, 2025
5291e38
Enable all cancellation types for async_exec
anarthal May 21, 2025
f5c1a51
doc paragraph on cancellation
anarthal May 21, 2025
77e8cea
use async_immediate
anarthal May 21, 2025
5663515
classify tests in cmake
anarthal May 21, 2025
2723640
unit tests to Jamfile
anarthal May 21, 2025
f5b3fae
Merge branch 'develop' into feature/sansio-exec
anarthal Jun 6, 2025
d4eda3a
commit_read => consume_next
anarthal Jun 6, 2025
7820f28
Remove test_conversions from Jamfile
anarthal Jun 6, 2025
9a6acec
Merge branch 'develop' into feature/sansio-exec
anarthal Jun 7, 2025
8c35f41
apply PR comments
anarthal Jun 7, 2025
772c7cc
explicit return type
anarthal Jun 7, 2025
145dd88
(failing) parse error test
anarthal Jun 7, 2025
74768f4
remove request from the multiplexer on error
anarthal Jun 7, 2025
8b3fc00
Restore runner
anarthal Jun 7, 2025
4af8aa9
Remove C++ std from cmake test
anarthal Jun 7, 2025
102e8b0
new setup_cancellation action
anarthal Jun 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 38 additions & 56 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connector.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/multiplexer.hpp>
Expand All @@ -25,7 +26,6 @@

#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/associated_immediate_executor.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp>
Expand All @@ -34,6 +34,7 @@
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/immediate.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/prepend.hpp>
Expand All @@ -48,8 +49,6 @@
#include <array>
#include <chrono>
#include <cstddef>
#include <deque>
#include <functional>
#include <memory>

namespace boost::redis {
Expand Down Expand Up @@ -112,68 +111,41 @@ using exec_notifier_type = asio::experimental::channel<

template <class Conn>
struct exec_op {
using req_info_type = typename multiplexer::elem;
using executor_type = typename Conn::executor_type;

Conn* conn_ = nullptr;
std::shared_ptr<exec_notifier_type<executor_type>> notifier_ = nullptr;
std::shared_ptr<req_info_type> info_ = nullptr;
asio::coroutine coro_{};
detail::exec_fsm fsm_;

template <class Self>
void operator()(Self& self, system::error_code = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER(coro_)
{
// Check whether the user wants to wait for the connection to
// be stablished.
if (info_->get_request().get_config().cancel_if_not_connected && !conn_->is_open()) {
BOOST_ASIO_CORO_YIELD
asio::dispatch(
asio::get_associated_immediate_executor(self, self.get_io_executor()),
std::move(self));
return self.complete(error::not_connected, 0);
}

conn_->mpx_.add(info_);
if (conn_->trigger_write()) {
conn_->writer_timer_.cancel();
}

EXEC_OP_WAIT:
BOOST_ASIO_CORO_YIELD
notifier_->async_receive(std::move(self));

if (info_->get_error()) {
self.complete(info_->get_error(), 0);
return;
}

if (is_cancelled(self)) {
if (!conn_->mpx_.remove(info_)) {
using c_t = asio::cancellation_type;
auto const c = self.get_cancellation_state().cancelled();
if ((c & c_t::terminal) != c_t::none) {
// Cancellation requires closing the connection
// otherwise it stays in inconsistent state.
conn_->cancel(operation::run);
return self.complete(asio::error::operation_aborted, 0);
} else {
// Can't implement other cancelation types, ignoring.
self.get_cancellation_state().clear();

// TODO: Find out a better way to ignore
// cancelation.
goto EXEC_OP_WAIT;
}
} else {
// Cancelation honored.
self.complete(asio::error::operation_aborted, 0);
while (true) {
// Invoke the state machine
auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled());

// Do what the FSM said
switch (act.type()) {
case detail::exec_action_type::setup_cancellation:
self.reset_cancellation_state(asio::enable_total_cancellation());
continue; // this action does not require yielding
case detail::exec_action_type::immediate:
asio::async_immediate(self.get_io_executor(), std::move(self));
return;
case detail::exec_action_type::write:
conn_->writer_timer_.cancel();
continue; // this action does not require yielding
case detail::exec_action_type::wait_for_response:
notifier_->async_receive(std::move(self));
return;
case detail::exec_action_type::cancel_run:
conn_->cancel(operation::run);
continue; // this action does not require yielding
case detail::exec_action_type::done:
notifier_.reset();
self.complete(act.error(), act.bytes_read());
return;
}
}

self.complete(info_->get_error(), info_->get_read_size());
}
}
};
Expand Down Expand Up @@ -660,6 +632,16 @@ class basic_connection {
*
* Where the second parameter is the size of the response received
* in bytes.
*
* @par Per-operation cancellation
* This operation supports per-operation cancellation. The following cancellation types
* are supported:
*
* - `asio::cancellation_type_t::terminal`. Always supported. May cause the current
* `async_run` operation to be cancelled.
* - `asio::cancellation_type_t::partial` and `asio::cancellation_type_t::total`.
* Supported only if the request hasn't been written to the network yet.
*
*/
template <
class Response = ignore_t,
Expand Down Expand Up @@ -692,7 +674,7 @@ class basic_connection {
});

return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
detail::exec_op<this_type>{this, notifier, info},
detail::exec_op<this_type>{this, notifier, detail::exec_fsm(mpx_, std::move(info))},
token,
writer_timer_);
}
Expand Down
36 changes: 36 additions & 0 deletions include/boost/redis/detail/coroutine.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_DETAIL_COROUTINE_HPP
#define BOOST_REDIS_DETAIL_COROUTINE_HPP

// asio::coroutine uses __COUNTER__ internally, which can trigger
// ODR violations if we use them in header-only code. These manifest as
// extremely hard-to-debug bugs only present in release builds.
// Use this instead when doing coroutines in non-template code.
// Adapted from Boost.MySQL.

// Coroutine state is represented as an integer (resume_point_var).
// Every yield gets assigned a unique value (resume_point_id).
// Yielding sets the next resume point, returns, and sets a case label for re-entering.
// Coroutines need to switch on resume_point_var to re-enter.

// Enclosing this in a scope allows placing the macro inside a brace-less for/while loop
// The empty scope after the case label is required because labels can't be at the end of a compound statement
#define BOOST_REDIS_YIELD(resume_point_var, resume_point_id, ...) \
{ \
resume_point_var = resume_point_id; \
return __VA_ARGS__; \
case resume_point_id: \
{ \
} \
}

#define BOOST_REDIS_CORO_INITIAL case 0:

#endif
149 changes: 149 additions & 0 deletions include/boost/redis/detail/exec_fsm.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_EXEC_FSM_HPP
#define BOOST_REDIS_EXEC_FSM_HPP

#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <boost/system/error_code.hpp>

#include <cstddef>
#include <memory>

// Sans-io algorithm for async_exec, as a finite state machine

namespace boost::redis::detail {

// What should we do next?
enum class exec_action_type
{
setup_cancellation, // Set up the cancellation types supported by the composed operation
immediate, // Invoke asio::async_immediate to avoid re-entrancy problems
done, // Call the final handler
write, // Notify the writer task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming here could be more descriptive notify_writer for example.

wait_for_response, // Wait to be notified
cancel_run, // Cancel the connection's run operation
};

class exec_action {
exec_action_type type_;
system::error_code ec_;
std::size_t bytes_read_;

public:
exec_action(exec_action_type type) noexcept
: type_{type}
{ }

exec_action(system::error_code ec, std::size_t bytes_read = 0u) noexcept
: type_{exec_action_type::done}
, ec_{ec}
, bytes_read_{bytes_read}
{ }

exec_action_type type() const { return type_; }
system::error_code error() const { return ec_; }
std::size_t bytes_read() const { return bytes_read_; }
};

class exec_fsm {
int resume_point_{0};
multiplexer* mpx_{nullptr};
std::shared_ptr<multiplexer::elem> elem_;

static bool is_cancellation(asio::cancellation_type_t type)
{
return !!(
type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial |
asio::cancellation_type_t::terminal));
}

exec_action resume_impl(bool connection_is_open, asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL

// Check whether the user wants to wait for the connection to
// be established.
if (elem_->get_request().get_config().cancel_if_not_connected && !connection_is_open) {
BOOST_REDIS_YIELD(resume_point_, 1, exec_action_type::immediate)
return system::error_code(error::not_connected);
}

// No more immediate errors. Set up the supported cancellation types.
// This is required to get partial and total cancellations.
// This is a potentially allocating operation, so do it as late as we can.
BOOST_REDIS_YIELD(resume_point_, 2, exec_action_type::setup_cancellation)

// Add the request to the multiplexer
mpx_->add(elem_);

// Notify the writer task that there is work to do. If the task is not
// listening (e.g. it's already writing or the connection is not healthy),
// this is a no-op. Since this is sync, no cancellation can happen here.
BOOST_REDIS_YIELD(resume_point_, 3, exec_action_type::write)

while (true) {
// Wait until we get notified. This will return once the request completes,
// or upon any kind of cancellation
BOOST_REDIS_YIELD(resume_point_, 4, exec_action_type::wait_for_response)

// If the request has completed (with error or not), we're done
if (elem_->is_done()) {
return exec_action{elem_->get_error(), elem_->get_read_size()};
}

// If we're cancelled, try to remove the request from the queue. This will only
// succeed if the request is waiting (wasn't written yet)
if (is_cancellation(cancel_state) && mpx_->remove(elem_)) {
return exec_action{asio::error::operation_aborted};
}

// If we hit a terminal cancellation, tear down the connection.
// Otherwise, go back to waiting.
// TODO: we could likely do better here and mark the request as cancelled, removing
// the done callback and the adapter. But this requires further exploration
if (!!(cancel_state & asio::cancellation_type_t::terminal)) {
BOOST_REDIS_YIELD(resume_point_, 5, exec_action_type::cancel_run)
return exec_action{asio::error::operation_aborted};
}
}
}

// We should never get here
BOOST_ASSERT(false);
return exec_action{system::error_code()};
}

public:
exec_fsm(multiplexer& mpx, std::shared_ptr<multiplexer::elem> elem) noexcept
: mpx_(&mpx)
, elem_(std::move(elem))
{ }

exec_action resume(bool connection_is_open, asio::cancellation_type_t cancel_state)
{
// When completing, we should deallocate any temporary storage we acquired
// for the operation before invoking the final handler.
// This intercepts the returned action to implement this.
auto act = resume_impl(connection_is_open, cancel_state);
if (act.type() == exec_action_type::done)
elem_.reset();
return act;
}
};

} // namespace boost::redis::detail

#endif // BOOST_REDIS_CONNECTOR_HPP
19 changes: 15 additions & 4 deletions include/boost/redis/detail/multiplexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ struct multiplexer {

void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };

auto notify_done() noexcept { done_(); }
auto notify_done() noexcept -> void
{
status_ = status::done;
done_();
}

auto notify_error(system::error_code ec) noexcept -> void;

Expand All @@ -65,6 +69,12 @@ struct multiplexer {
return status_ == status::staged;
}

[[nodiscard]]
bool is_done() const noexcept
{
return status_ == status::done;
}

void mark_written() noexcept { status_ = status::written; }

void mark_staged() noexcept { status_ = status::staged; }
Expand All @@ -86,9 +96,10 @@ struct multiplexer {
private:
enum class status
{
waiting,
staged,
written
waiting, // the request hasn't been written yet
staged, // we've issued the write for this request, but it hasn't finished yet
written, // the request has been written successfully
done, // the request has completed and the done callback has been invoked
};

request const* req_;
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/impl/multiplexer.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void
ec_ = ec;
}

done_();
notify_done();
}

auto multiplexer::elem::commit_response(std::size_t read_size) -> void
Expand Down Expand Up @@ -119,6 +119,7 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec

if (ec) {
reqs_.front()->notify_error(ec);
reqs_.pop_front();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, this is probably fixing a bug.

return std::make_pair(std::make_optional(false), 0);
}

Expand Down
Loading
Loading