diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c203e3d5..f9deddaa 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -24,7 +25,6 @@ #include #include -#include #include #include #include @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -110,68 +111,41 @@ using exec_notifier_type = asio::experimental::channel< template struct exec_op { - using req_info_type = typename multiplexer::elem; using executor_type = typename Conn::executor_type; Conn* conn_ = nullptr; std::shared_ptr> notifier_ = nullptr; - std::shared_ptr info_ = nullptr; - asio::coroutine coro_{}; + detail::exec_fsm fsm_; template void operator()(Self& self, system::error_code = {}, std::size_t = 0) { - BOOST_ASIO_CORO_REENTER(coro_) - { - // Check whether the user wants to wait for the connection to - // be stablished. - if (info_->get_request().get_config().cancel_if_not_connected && !conn_->is_open()) { - BOOST_ASIO_CORO_YIELD - asio::dispatch( - asio::get_associated_immediate_executor(self, self.get_io_executor()), - std::move(self)); - return self.complete(error::not_connected, 0); - } - - conn_->mpx_.add(info_); - if (conn_->trigger_write()) { - conn_->writer_timer_.cancel(); - } - -EXEC_OP_WAIT: - BOOST_ASIO_CORO_YIELD - notifier_->async_receive(std::move(self)); - - if (info_->get_error()) { - self.complete(info_->get_error(), 0); - return; - } - - if (is_cancelled(self)) { - if (!conn_->mpx_.remove(info_)) { - using c_t = asio::cancellation_type; - auto const c = self.get_cancellation_state().cancelled(); - if ((c & c_t::terminal) != c_t::none) { - // Cancellation requires closing the connection - // otherwise it stays in inconsistent state. - conn_->cancel(operation::run); - return self.complete(asio::error::operation_aborted, 0); - } else { - // Can't implement other cancelation types, ignoring. - self.get_cancellation_state().clear(); - - // TODO: Find out a better way to ignore - // cancelation. - goto EXEC_OP_WAIT; - } - } else { - // Cancelation honored. - self.complete(asio::error::operation_aborted, 0); + while (true) { + // Invoke the state machine + auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled()); + + // Do what the FSM said + switch (act.type()) { + case detail::exec_action_type::setup_cancellation: + self.reset_cancellation_state(asio::enable_total_cancellation()); + continue; // this action does not require yielding + case detail::exec_action_type::immediate: + asio::async_immediate(self.get_io_executor(), std::move(self)); + return; + case detail::exec_action_type::notify_writer: + conn_->writer_timer_.cancel(); + continue; // this action does not require yielding + case detail::exec_action_type::wait_for_response: + notifier_->async_receive(std::move(self)); + return; + case detail::exec_action_type::cancel_run: + conn_->cancel(operation::run); + continue; // this action does not require yielding + case detail::exec_action_type::done: + notifier_.reset(); + self.complete(act.error(), act.bytes_read()); return; - } } - - self.complete(info_->get_error(), info_->get_read_size()); } } }; @@ -612,6 +586,16 @@ class basic_connection { * * Where the second parameter is the size of the response received * in bytes. + * + * @par Per-operation cancellation + * This operation supports per-operation cancellation. The following cancellation types + * are supported: + * + * - `asio::cancellation_type_t::terminal`. Always supported. May cause the current + * `async_run` operation to be cancelled. + * - `asio::cancellation_type_t::partial` and `asio::cancellation_type_t::total`. + * Supported only if the request hasn't been written to the network yet. + * */ template < class Response = ignore_t, @@ -644,7 +628,7 @@ class basic_connection { }); return asio::async_compose( - detail::exec_op{this, notifier, info}, + detail::exec_op{this, notifier, detail::exec_fsm(mpx_, std::move(info))}, token, writer_timer_); } diff --git a/include/boost/redis/detail/coroutine.hpp b/include/boost/redis/detail/coroutine.hpp new file mode 100644 index 00000000..f8fe22d5 --- /dev/null +++ b/include/boost/redis/detail/coroutine.hpp @@ -0,0 +1,36 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_DETAIL_COROUTINE_HPP +#define BOOST_REDIS_DETAIL_COROUTINE_HPP + +// asio::coroutine uses __COUNTER__ internally, which can trigger +// ODR violations if we use them in header-only code. These manifest as +// extremely hard-to-debug bugs only present in release builds. +// Use this instead when doing coroutines in non-template code. +// Adapted from Boost.MySQL. + +// Coroutine state is represented as an integer (resume_point_var). +// Every yield gets assigned a unique value (resume_point_id). +// Yielding sets the next resume point, returns, and sets a case label for re-entering. +// Coroutines need to switch on resume_point_var to re-enter. + +// Enclosing this in a scope allows placing the macro inside a brace-less for/while loop +// The empty scope after the case label is required because labels can't be at the end of a compound statement +#define BOOST_REDIS_YIELD(resume_point_var, resume_point_id, ...) \ + { \ + resume_point_var = resume_point_id; \ + return __VA_ARGS__; \ + case resume_point_id: \ + { \ + } \ + } + +#define BOOST_REDIS_CORO_INITIAL case 0: + +#endif diff --git a/include/boost/redis/detail/exec_fsm.hpp b/include/boost/redis/detail/exec_fsm.hpp new file mode 100644 index 00000000..52ec767b --- /dev/null +++ b/include/boost/redis/detail/exec_fsm.hpp @@ -0,0 +1,72 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_EXEC_FSM_HPP +#define BOOST_REDIS_EXEC_FSM_HPP + +#include + +#include +#include + +#include +#include + +// Sans-io algorithm for async_exec, as a finite state machine + +namespace boost::redis::detail { + +// What should we do next? +enum class exec_action_type +{ + setup_cancellation, // Set up the cancellation types supported by the composed operation + immediate, // Invoke asio::async_immediate to avoid re-entrancy problems + done, // Call the final handler + notify_writer, // Notify the writer task + wait_for_response, // Wait to be notified + cancel_run, // Cancel the connection's run operation +}; + +class exec_action { + exec_action_type type_; + system::error_code ec_; + std::size_t bytes_read_; + +public: + exec_action(exec_action_type type) noexcept + : type_{type} + { } + + exec_action(system::error_code ec, std::size_t bytes_read = 0u) noexcept + : type_{exec_action_type::done} + , ec_{ec} + , bytes_read_{bytes_read} + { } + + exec_action_type type() const { return type_; } + system::error_code error() const { return ec_; } + std::size_t bytes_read() const { return bytes_read_; } +}; + +class exec_fsm { + int resume_point_{0}; + multiplexer* mpx_{nullptr}; + std::shared_ptr elem_; + +public: + exec_fsm(multiplexer& mpx, std::shared_ptr elem) noexcept + : mpx_(&mpx) + , elem_(std::move(elem)) + { } + + exec_action resume(bool connection_is_open, asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_CONNECTOR_HPP diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 5818db2d..8b01409e 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -43,7 +43,11 @@ struct multiplexer { void set_done_callback(std::function f) noexcept { done_ = std::move(f); }; - auto notify_done() noexcept { done_(); } + auto notify_done() noexcept -> void + { + status_ = status::done; + done_(); + } auto notify_error(system::error_code ec) noexcept -> void; @@ -65,6 +69,12 @@ struct multiplexer { return status_ == status::staged; } + [[nodiscard]] + bool is_done() const noexcept + { + return status_ == status::done; + } + void mark_written() noexcept { status_ = status::written; } void mark_staged() noexcept { status_ = status::staged; } @@ -86,9 +96,10 @@ struct multiplexer { private: enum class status { - waiting, - staged, - written + waiting, // the request hasn't been written yet + staged, // we've issued the write for this request, but it hasn't finished yet + written, // the request has been written successfully + done, // the request has completed and the done callback has been invoked }; request const* req_; diff --git a/include/boost/redis/impl/exec_fsm.ipp b/include/boost/redis/impl/exec_fsm.ipp new file mode 100644 index 00000000..658d330c --- /dev/null +++ b/include/boost/redis/impl/exec_fsm.ipp @@ -0,0 +1,94 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_EXEC_FSM_IPP +#define BOOST_REDIS_EXEC_FSM_IPP + +#include +#include +#include + +#include +#include + +namespace boost::redis::detail { + +inline bool is_cancellation(asio::cancellation_type_t type) +{ + return !!( + type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial | + asio::cancellation_type_t::terminal)); +} + +} // namespace boost::redis::detail + +boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume( + bool connection_is_open, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Check whether the user wants to wait for the connection to + // be established. + if (elem_->get_request().get_config().cancel_if_not_connected && !connection_is_open) { + BOOST_REDIS_YIELD(resume_point_, 1, exec_action_type::immediate) + elem_.reset(); // Deallocate memory before finalizing + return system::error_code(error::not_connected); + } + + // No more immediate errors. Set up the supported cancellation types. + // This is required to get partial and total cancellations. + // This is a potentially allocating operation, so do it as late as we can. + BOOST_REDIS_YIELD(resume_point_, 2, exec_action_type::setup_cancellation) + + // Add the request to the multiplexer + mpx_->add(elem_); + + // Notify the writer task that there is work to do. If the task is not + // listening (e.g. it's already writing or the connection is not healthy), + // this is a no-op. Since this is sync, no cancellation can happen here. + BOOST_REDIS_YIELD(resume_point_, 3, exec_action_type::notify_writer) + + while (true) { + // Wait until we get notified. This will return once the request completes, + // or upon any kind of cancellation + BOOST_REDIS_YIELD(resume_point_, 4, exec_action_type::wait_for_response) + + // If the request has completed (with error or not), we're done + if (elem_->is_done()) { + exec_action act{elem_->get_error(), elem_->get_read_size()}; + elem_.reset(); // Deallocate memory before finalizing + return act; + } + + // If we're cancelled, try to remove the request from the queue. This will only + // succeed if the request is waiting (wasn't written yet) + if (is_cancellation(cancel_state) && mpx_->remove(elem_)) { + elem_.reset(); // Deallocate memory before finalizing + return exec_action{asio::error::operation_aborted}; + } + + // If we hit a terminal cancellation, tear down the connection. + // Otherwise, go back to waiting. + // TODO: we could likely do better here and mark the request as cancelled, removing + // the done callback and the adapter. But this requires further exploration + if (!!(cancel_state & asio::cancellation_type_t::terminal)) { + BOOST_REDIS_YIELD(resume_point_, 5, exec_action_type::cancel_run) + elem_.reset(); // Deallocate memory before finalizing + return exec_action{asio::error::operation_aborted}; + } + } + } + + // We should never get here + BOOST_ASSERT(false); + return exec_action{system::error_code()}; +} + +#endif \ No newline at end of file diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 4c118836..021a4e9f 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -31,7 +31,7 @@ auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void ec_ = ec; } - done_(); + notify_done(); } auto multiplexer::elem::commit_response(std::size_t read_size) -> void @@ -119,6 +119,7 @@ std::pair multiplexer::consume_next(system::error_code& ec if (ec) { reqs_.front()->notify_error(ec); + reqs_.pop_front(); return std::make_pair(std::make_optional(false), 0); } diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index a83af6bc..86d92285 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3c58ed29..a5268e37 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -37,6 +37,7 @@ make_test(test_low_level) make_test(test_request) make_test(test_low_level_sync_sans_io) make_test(test_any_adapter) +make_test(test_exec_fsm) # Tests that require a real Redis server make_test(test_conn_quit) diff --git a/test/Jamfile b/test/Jamfile index e0f06c3a..5f75e38e 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -52,6 +52,7 @@ local tests = test_request test_low_level_sync_sans_io test_any_adapter + test_exec_fsm ; # Build and run the tests diff --git a/test/test_conn_exec_cancel.cpp b/test/test_conn_exec_cancel.cpp index 5618c2e8..9c32a8cb 100644 --- a/test/test_conn_exec_cancel.cpp +++ b/test/test_conn_exec_cancel.cpp @@ -5,7 +5,14 @@ */ #include - +#include +#include + +#include +#include +#include +#include +#include #include #include @@ -18,6 +25,8 @@ #ifdef BOOST_ASIO_HAS_CO_AWAIT #include +using namespace std::chrono_literals; + // NOTE1: I have observed that if hello and // blpop are sent together, Redis will send the response of hello // right away, not waiting for blpop. @@ -122,6 +131,50 @@ BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled) BOOST_TEST(finished); } +// We can cancel requests that haven't been written yet. +// All cancellation types are supported here. +BOOST_AUTO_TEST_CASE(test_cancel_pending) +{ + struct { + const char* name; + net::cancellation_type_t cancel_type; + } test_cases[] = { + {"terminal", net::cancellation_type_t::terminal}, + {"partial", net::cancellation_type_t::partial }, + {"total", net::cancellation_type_t::total }, + }; + + for (const auto& tc : test_cases) { + BOOST_TEST_CONTEXT(tc.name) + { + // Setup + net::io_context ctx; + connection conn(ctx); + request req; + req.push("get", "mykey"); + + // Issue a request without calling async_run(), so the request stays waiting forever + net::cancellation_signal sig; + bool called = false; + conn.async_exec( + req, + ignore, + net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) { + BOOST_TEST(ec == net::error::operation_aborted); + BOOST_TEST(sz == 0u); + called = true; + })); + + // Issue a cancellation + sig.emit(tc.cancel_type); + + // Prevent the test for deadlocking in case of failure + ctx.run_for(3s); + BOOST_TEST(called); + } + } +} + } // namespace #else diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp new file mode 100644 index 00000000..f0b034ed --- /dev/null +++ b/test/test_exec_fsm.cpp @@ -0,0 +1,378 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace boost::redis; +namespace asio = boost::asio; +using detail::exec_fsm; +using detail::multiplexer; +using detail::exec_action_type; +using detail::exec_action; +using boost::system::error_code; +using boost::asio::cancellation_type_t; + +// Operators +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, exec_action_type type) +{ + switch (type) { + case exec_action_type::immediate: return os << "exec_action_type::immediate"; + case exec_action_type::done: return os << "exec_action_type::done"; + case exec_action_type::notify_writer: return os << "exec_action_type::notify_writer"; + case exec_action_type::wait_for_response: return os << "exec_action_type::wait_for_response"; + case exec_action_type::cancel_run: return os << "exec_action_type::cancel_run"; + default: return os << ""; + } +} + +bool operator==(exec_action lhs, exec_action rhs) noexcept +{ + if (lhs.type() != rhs.type()) + return false; + else if (lhs.type() == exec_action_type::done) + return lhs.bytes_read() == rhs.bytes_read() && lhs.error() == rhs.error(); + else + return true; +} + +std::ostream& operator<<(std::ostream& os, exec_action act) +{ + os << "exec_action{ .type=" << act.type(); + if (act.type() == exec_action_type::done) + os << ", .bytes_read=" << act.bytes_read() << ", .error=" << act.error(); + return os << " }"; +} + +} // namespace boost::redis::detail + +// Prints a message on failure. Useful for parameterized tests +#define BOOST_TEST_EQ_MSG(lhs, rhs, msg) \ + if (!BOOST_TEST_EQ(lhs, rhs)) { \ + BOOST_LIGHTWEIGHT_TEST_OSTREAM << "Failure happened in context: " << msg << std::endl; \ + } + +namespace { + +// A helper to create a request and its associated elem +struct elem_and_request { + request req; + std::size_t done_calls{0u}; // number of times the done callback has been invoked + std::shared_ptr elm; + std::weak_ptr weak_elm; // check that we free memory + + elem_and_request(request::config cfg = {}) + : req(cfg) + { + // Empty requests are not valid. The request needs to be populated before creating the element + req.push("get", "mykey"); + + elm = std::make_shared( + req, + [](std::size_t, resp3::node_view const&, error_code&) { }); + elm->set_done_callback([this] { + ++done_calls; + }); + + weak_elm = elm; + } +}; + +// The happy path +void test_success() +{ + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + error_code ec; + + // Initiate + auto act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::notify_writer); + + // We should now wait for a response + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // Simulate a successful write + BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write + BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response + + // Simulate a successful read + mpx.get_read_buffer() = "$5\r\nhello\r\n"; + auto req_status = mpx.consume_next(ec); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push + BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed + BOOST_TEST_EQ(input.done_calls, 1u); + + // This will awaken the exec operation, and should complete the operation + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action(error_code(), 11u)); + + // All memory should have been freed by now + BOOST_TEST(input.weak_elm.expired()); +} + +// The request encountered an error while parsing +void test_parse_error() +{ + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + error_code ec; + + // Initiate + auto act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::notify_writer); + + // We should now wait for a response + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // Simulate a successful write + BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write + BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response + + // Simulate a read that will trigger an error. + // The second field should be a number (rather than the empty string). + // Note that although part of the buffer was consumed, the multiplexer + // currently throws this information away. + mpx.get_read_buffer() = "*2\r\n$5\r\nhello\r\n:\r\n"; + auto req_status = mpx.consume_next(ec); + BOOST_TEST_EQ(ec, error::empty_field); + BOOST_TEST_EQ(req_status.second, 0u); + BOOST_TEST_EQ(input.done_calls, 1u); + + // This will awaken the exec operation, and should complete the operation + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action(error::empty_field, 0u)); + + // All memory should have been freed by now + BOOST_TEST(input.weak_elm.expired()); +} + +// The request was configured to be cancelled on connection error, and the connection is closed +void test_cancel_if_not_connected() +{ + // Setup + multiplexer mpx; + request::config cfg; + cfg.cancel_if_not_connected = true; + elem_and_request input(cfg); + exec_fsm fsm(mpx, std::move(input.elm)); + + // Initiate. We're not connected, so the request gets cancelled + auto act = fsm.resume(false, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::immediate); + + act = fsm.resume(false, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action(error::not_connected)); + + // We didn't leave memory behind + BOOST_TEST(input.weak_elm.expired()); +} + +// The connection is closed when we start the request, but the request was configured to wait +void test_not_connected() +{ + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + error_code ec; + + // Initiate + auto act = fsm.resume(false, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::notify_writer); + + // We should now wait for a response + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // Simulate a successful write + BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write + BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response + + // Simulate a successful read + mpx.get_read_buffer() = "$5\r\nhello\r\n"; + auto req_status = mpx.consume_next(ec); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push + BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed + BOOST_TEST_EQ(input.done_calls, 1u); + + // This will awaken the exec operation, and should complete the operation + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action(error_code(), 11u)); + + // All memory should have been freed by now + BOOST_TEST(input.weak_elm.expired()); +} + +// +// Cancellations +// + +// If the request is waiting, all cancellation types are supported +void test_cancel_waiting() +{ + constexpr struct { + const char* name; + asio::cancellation_type_t type; + } test_cases[] = { + {"terminal", asio::cancellation_type_t::terminal }, + {"partial", asio::cancellation_type_t::partial }, + {"total", asio::cancellation_type_t::total }, + {"mixed", asio::cancellation_type_t::partial | asio::cancellation_type_t::terminal}, + {"all", asio::cancellation_type_t::all }, + }; + + for (const auto& tc : test_cases) { + // Setup + multiplexer mpx; + elem_and_request input, input2; + exec_fsm fsm(mpx, std::move(input.elm)); + + // Another request enters the multiplexer, so it's busy when we start + mpx.add(input2.elm); + BOOST_TEST_EQ_MSG(mpx.prepare_write(), 1u, tc.name); + + // Initiate and wait + auto act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); + + // We get notified because the request got cancelled + act = fsm.resume(true, tc.type); + BOOST_TEST_EQ_MSG(act, exec_action(asio::error::operation_aborted), tc.name); + BOOST_TEST_EQ_MSG(input.weak_elm.expired(), true, tc.name); // we didn't leave memory behind + } +} + +// If the request is being processed and terminal cancellation got requested, we cancel the connection +void test_cancel_notwaiting_terminal() +{ + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + + // Initiate + auto act = fsm.resume(false, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::notify_writer); + + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // The multiplexer starts writing the request + BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write + + // A cancellation arrives + act = fsm.resume(true, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, exec_action_type::cancel_run); + act = fsm.resume(true, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted)); + + // The object needs to survive here, otherwise an inconsistent connection state is created +} + +// If the request is being processed and other types of cancellation got requested, we ignore the cancellation +void test_cancel_notwaiting_notterminal() +{ + constexpr struct { + const char* name; + asio::cancellation_type_t type; + } test_cases[] = { + {"partial", asio::cancellation_type_t::partial }, + {"total", asio::cancellation_type_t::total }, + {"mixed", asio::cancellation_type_t::partial | asio::cancellation_type_t::total}, + }; + + for (const auto& tc : test_cases) { + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + error_code ec; + + // Initiate + auto act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name); + + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); + + // Simulate a successful write + BOOST_TEST_EQ_MSG(mpx.prepare_write(), 1u, tc.name); + BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name); // all requests expect a response + + // We got requested a cancellation here, but we can't honor it + act = fsm.resume(true, tc.type); + BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); + + // Simulate a successful read + mpx.get_read_buffer() = "$5\r\nhello\r\n"; + auto req_status = mpx.consume_next(ec); + BOOST_TEST_EQ_MSG(ec, error_code(), tc.name); + BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push + BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed + BOOST_TEST_EQ_MSG(input.done_calls, 1u, tc.name); + + // This will awaken the exec operation, and should complete the operation + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ_MSG(act, exec_action(error_code(), 11u), tc.name); + + // All memory should have been freed by now + BOOST_TEST_EQ_MSG(input.weak_elm.expired(), true, tc.name); + } +} + +} // namespace + +int main() +{ + test_success(); + test_parse_error(); + test_cancel_if_not_connected(); + test_not_connected(); + test_cancel_waiting(); + test_cancel_notwaiting_terminal(); + test_cancel_notwaiting_notterminal(); + + return boost::report_errors(); +} diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 768fbfd2..24bf7c67 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -369,7 +369,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) // The staged status should now have changed to written. BOOST_TEST(item1.elem_ptr->is_written()); - BOOST_TEST(item2.elem_ptr->is_written()); + BOOST_TEST(item2.elem_ptr->is_done()); BOOST_TEST(item3.elem_ptr->is_written()); // The done status should still be unchanged on requests that