diff --git a/CMakeLists.txt b/CMakeLists.txt index 6124dd07..523f3e6c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,7 @@ write_basic_package_version_file( COMPATIBILITY AnyNewerVersion ) -find_package(Boost 1.79 REQUIRED) +find_package(Boost 1.80 REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) find_package(OpenSSL REQUIRED) @@ -54,19 +54,25 @@ include_directories(include) # Main function for the examples. #======================================================================= -add_library(common STATIC examples/common.cpp) +add_library(common STATIC + examples/common/common.cpp + examples/common/main.cpp + examples/common/aedis.cpp +) target_compile_features(common PUBLIC cxx_std_20) # Executables #======================================================================= -#add_executable(intro_sync examples/intro_sync.cpp) // Uncomment after update to Boost 1.80 - add_executable(intro examples/intro.cpp) target_link_libraries(intro common) target_compile_features(intro PUBLIC cxx_std_20) add_test(intro intro) +add_executable(intro_sync examples/intro_sync.cpp) +target_compile_features(intro_sync PUBLIC cxx_std_20) +add_test(intro_sync intro_sync) + add_executable(chat_room examples/chat_room.cpp) target_compile_features(chat_room PUBLIC cxx_std_20) target_link_libraries(chat_room common) diff --git a/README.md b/README.md index d22d5558..dfbc839b 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,14 @@ Some of its distinctive features are * Back pressure, cancellation and low latency. In addition to that, Aedis hides most of the low-level Asio code away -from the user. For example, the coroutine below retrieves Redis hashes +from the user, which in the majority of the case will be concerned +with three library entities + +* `aedis::resp3::request`: A container of Redis commands. +* `aedis::adapt()`: A function that adapts data structures to receive Redis responses. +* `aedis::connection`: A connection to the Redis server. + +For example, the coroutine below reads Redis [hashes](https://redis.io/docs/data-types/hashes/) in a `std::map` and quits the connection (see containers.cpp) ```cpp @@ -24,7 +31,6 @@ auto hgetall(std::shared_ptr conn) -> net::awaitable { // A request contains multiple Redis commands. request req; - req.get_config().cancel_on_connection_lost = true; req.push("HELLO", 3); req.push("HGETALL", "hset-key"); req.push("QUIT"); @@ -39,11 +45,10 @@ auto hgetall(std::shared_ptr conn) -> net::awaitable } ``` -The execution of `connection::async_exec` as shown above is -triggered by the `connection::async_run` member function, which is -required to be running concurrently for as long as the connection -stands. For example, the code below uses a short-lived connection to -execute the coroutine above +The execution of `connection::async_exec` as shown above must +still be triggered by the `connection::async_run` member function. For +example, the code below uses a short-lived connection to execute the +coroutine above ```cpp net::awaitable async_main() @@ -53,7 +58,7 @@ net::awaitable async_main() // Resolves and connects (from examples/common.hpp to avoid vebosity) co_await connect(conn, "127.0.0.1", "6379"); - // Runs and executes the request. + // Runs hgetall (previous example). co_await (conn->async_run() || hgetall(conn)); } ``` @@ -67,32 +72,25 @@ reading from the socket. The reationale behind this design is * Support server pushes and requests in the same connection object, concurrently. -In the following sections we will discuss with more details the main -code entities Aedis users are concerned with, namely - -* `aedis::resp3::request`: A container of Redis commands. -* `aedis::adapt()`: A function that adapts data structures to receive Redis responses. -* `aedis::connection`: A connection to the Redis server. - -before that however, users might find it helpful to skim over the -examples, to gain a better feeling about the library capabilities. +Before we see with more detail how connections, requests and responses +work, users might find it helpful to skim over the examples, to gain a +better feeling about the library capabilities. * intro.cpp: The Aedis hello-world program. Sends one command to Redis and quits the connection. * intro_tls.cpp: Same as intro.cpp but over TLS. +* intro_sync.cpp: Shows how to use the conneciton class synchronously. * containers.cpp: Shows how to send and receive STL containers and how to use transactions. * serialization.cpp: Shows how to serialize types using Boost.Json. * resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels. * subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription. * echo_server.cpp: A simple TCP echo server. * chat_room.cpp: A command line chat built on Redis pubsub. - -The next two examples uses the Aedis low-level API - -* low_level_sync.cpp: Sends a ping synchronously. -* low_level_async.cpp: Sends a ping asynchronously +* low_level_sync.cpp: Sends a ping synchronously using the low-level API. +* low_level_async.cpp: Sends a ping asynchronously using the low-level API. To avoid repetition code that is common to all examples have been -grouped in common.hpp. +grouped in common.hpp. The main function is defined in main.cpp and +used by all examples. ### Requests @@ -498,7 +496,7 @@ auto async_main() -> net::awaitable It is important to emphasize that Redis servers use the old communication protocol RESP2 by default, therefore it is necessary to send a `HELLO 3` command everytime a connection is established. -Another common scenarios for reconnection is, for example, a failover +Another common scenario for reconnection is, for example, a failover with sentinels, covered in `resolve_with_sentinel.cpp` example. #### Execute @@ -613,7 +611,7 @@ co_await (conn.async_exec(...) || time.async_wait(...)) should last. * The cancellation will be ignored if the request has already been written to the socket. -* It is usually a better idea to have a healthy checker that adding +* It is usually a better idea to have a healthy checker than adding per request timeout, see subscriber.cpp for an example. ```cpp diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index e8b97578..dad20b93 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -10,7 +10,7 @@ #include #include -#include "common.hpp" +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; @@ -55,7 +55,7 @@ auto subscriber(std::shared_ptr conn) -> net::awaitable co_await conn->async_exec(req); } -// Called from the main function (see common.cpp) +// Called from the main function (see main.cpp) auto async_main() -> net::awaitable { auto ex = co_await net::this_coro::executor; diff --git a/examples/common/aedis.cpp b/examples/common/aedis.cpp new file mode 100644 index 00000000..d2cf3585 --- /dev/null +++ b/examples/common/aedis.cpp @@ -0,0 +1,8 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include diff --git a/examples/common.cpp b/examples/common/common.cpp similarity index 80% rename from examples/common.cpp rename to examples/common/common.cpp index 4d539d51..da922a5a 100644 --- a/examples/common.cpp +++ b/examples/common/common.cpp @@ -6,6 +6,7 @@ #include "common.hpp" +#include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include @@ -18,9 +19,6 @@ using aedis::resp3::request; using aedis::adapt; using aedis::operation; -// Include this in no more than one .cpp file. -#include - namespace { auto redir(boost::system::error_code& ec) @@ -74,21 +72,4 @@ connect( throw std::runtime_error("Connect timeout"); } -extern net::awaitable async_main(); - -// Main function used in our examples. -auto main() -> int -{ - try { - net::io_context ioc; - net::co_spawn(ioc, async_main(), net::detached); - ioc.run(); - } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; - return 1; - } -} - -#else // defined(BOOST_ASIO_HAS_CO_AWAIT) -auto main() -> int {std::cout << "Requires coroutine support." << std::endl; return 0;} #endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/common.hpp b/examples/common/common.hpp similarity index 100% rename from examples/common.hpp rename to examples/common/common.hpp diff --git a/examples/common/main.cpp b/examples/common/main.cpp new file mode 100644 index 00000000..e4d69e6a --- /dev/null +++ b/examples/common/main.cpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + +namespace net = boost::asio; +extern net::awaitable async_main(); + +// The main function used in our examples. +auto main() -> int +{ + try { + net::io_context ioc; + net::co_spawn(ioc, async_main(), net::detached); + ioc.run(); + } catch (std::exception const& e) { + std::cerr << "Error: " << e.what() << std::endl; + return 1; + } +} + +#else // defined(BOOST_ASIO_HAS_CO_AWAIT) +auto main() -> int {std::cout << "Requires coroutine support." << std::endl; return 0;} +#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/containers.cpp b/examples/containers.cpp index d3872128..60975032 100644 --- a/examples/containers.cpp +++ b/examples/containers.cpp @@ -11,7 +11,7 @@ #include #include -#include "common.hpp" +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; @@ -92,7 +92,7 @@ auto transaction(std::shared_ptr conn) -> net::awaitable print(std::get<1>(std::get<4>(resp)).value()); } -// Called from the main function (see common.cpp) +// Called from the main function (see main.cpp) net::awaitable async_main() { auto conn = std::make_shared(co_await net::this_coro::executor); diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 5a215f94..b59908c2 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -8,7 +8,7 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common.hpp" +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; @@ -44,7 +44,7 @@ auto listener(std::shared_ptr conn) -> net::awaitable net::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), net::detached); } -// Called from the main function (see common.cpp) +// Called from the main function (see main.cpp) auto async_main() -> net::awaitable { auto ex = co_await net::this_coro::executor; diff --git a/examples/intro.cpp b/examples/intro.cpp index 8db947c3..74200140 100644 --- a/examples/intro.cpp +++ b/examples/intro.cpp @@ -8,15 +8,15 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common.hpp" +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; using aedis::adapt; using aedis::resp3::request; -// Called from the main function (see common.cpp) -net::awaitable async_main() +// Called from the main function (see main.cpp) +auto async_main() -> net::awaitable { request req; req.get_config().cancel_on_connection_lost = true; diff --git a/examples/intro_sync.cpp b/examples/intro_sync.cpp index 1168441f..4b0a42af 100644 --- a/examples/intro_sync.cpp +++ b/examples/intro_sync.cpp @@ -10,23 +10,21 @@ #include #include -// TODO: Fix this after updating to 1.80. - // Include this in no more than one .cpp file. #include namespace net = boost::asio; using aedis::adapt; using aedis::resp3::request; -using connection = aedis::connection<>; +using connection = aedis::connection; template -auto exec(connection& conn, request const& req, Adapter adapter, boost::system::error_code& ec) +auto exec(std::shared_ptr conn, request const& req, Adapter adapter) { net::dispatch( - conn.get_executor(), - net::deferred([&]() { return conn.async_exec(req, adapter, net::deferred); })) - (net::redirect_error(net::use_future, ec)).get(); + conn->get_executor(), + net::deferred([&]() { return conn->async_exec(req, adapter, net::deferred); })) + (net::use_future).get(); } auto logger = [](auto const& ec) @@ -37,9 +35,12 @@ int main() try { net::io_context ioc{1}; - connection conn{ioc}; - std::thread t{[&]() { - conn.async_run(logger); + auto conn = std::make_shared(ioc); + net::ip::tcp::resolver resv{ioc}; + auto const res = resv.resolve("127.0.0.1", "6379"); + net::connect(conn->next_layer(), res); + std::thread t{[conn, &ioc]() { + conn->async_run(logger); ioc.run(); }}; @@ -49,13 +50,10 @@ int main() req.push("PING"); req.push("QUIT"); - boost::system::error_code ec; - std::tuple resp; - exec(conn, req, adapt(resp), ec); + std::tuple resp; + exec(conn, req, adapt(resp)); - std::cout - << "Exec: " << ec.message() << "\n" - << "Response: " << std::get<0>(resp) << std::endl; + std::cout << "Response: " << std::get<1>(resp) << std::endl; t.join(); } catch (std::exception const& e) { diff --git a/examples/resolve_with_sentinel.cpp b/examples/resolve_with_sentinel.cpp index fe0e476e..25d675b7 100644 --- a/examples/resolve_with_sentinel.cpp +++ b/examples/resolve_with_sentinel.cpp @@ -8,7 +8,8 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common.hpp" + +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; diff --git a/examples/serialization.cpp b/examples/serialization.cpp index c3c1ceda..7b93fcf5 100644 --- a/examples/serialization.cpp +++ b/examples/serialization.cpp @@ -9,13 +9,13 @@ #include #include #include -#include "common.hpp" #include #include #include #include #include #include +#include "common/common.hpp" // Include this in no more than one .cpp file. #include diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index 6c4d0a60..d3d5da89 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -9,7 +9,7 @@ #include #include -#include "common.hpp" +#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; diff --git a/include/aedis/adapt.hpp b/include/aedis/adapt.hpp index ef858aa0..3eec484e 100644 --- a/include/aedis/adapt.hpp +++ b/include/aedis/adapt.hpp @@ -210,7 +210,7 @@ inline auto adapt(std::size_t max_read_size = (std::numeric_limits: * 2. std::vector> * * The types T1, T2, etc can be any STL container, any integer type - * and \c std::string + * and `std::string`. * * @param t Tuple containing the responses. * @param max_read_size Specifies the maximum size of the read diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index 6878069d..3564ca81 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -47,7 +47,7 @@ class basic_connection : using base_type = detail::connection_base>; - /// Constructor + /// Contructs from an executor. explicit basic_connection( executor_type ex, @@ -56,6 +56,7 @@ class basic_connection : , stream_{ex} {} + /// Contructs from a context. explicit basic_connection( boost::asio::io_context& ioc, @@ -82,10 +83,11 @@ class basic_connection : /// Returns a const reference to the next layer. auto next_layer() const noexcept -> auto const& { return stream_; } - /** @brief Establishes a connection with the Redis server asynchronously. + /** @brief Starts read and write operations * - * This function will start reading from the socket and executes - * all requests that have been started prior to this function + * This function starts read and write operations with the Redis + * server. More specifically it will trigger the write of all + * requests i.e. calls to `async_exec` that happened prior to this * call. * * @param token Completion token. @@ -109,10 +111,10 @@ class basic_connection : /** @brief Executes a command on the Redis server asynchronously. * * This function will send a request to the Redis server and - * complete when the response arrives. If the request contains - * only commands that don't expect a response, the completion - * occurs after it has been written to the underlying stream. - * Multiple concurrent calls to this function will be + * complete after the response has been processed. If the request + * contains only commands that don't expect a response, the + * completion occurs after it has been written to the underlying + * stream. Multiple concurrent calls to this function will be * automatically queued by the implementation. * * @param req Request object. @@ -177,12 +179,7 @@ class basic_connection : * @li operation::run: Cancels the `async_run` operation. Notice * that the preferred way to close a connection is to send a * [QUIT](https://redis.io/commands/quit/) command to the server. - * An unresponsive Redis server will also cause the idle-checks to - * timeout and lead to `connection::async_run` completing with - * `error::idle_timeout`. Calling `cancel(operation::run)` - * directly should be seen as the last option. - * @li operation::receive: Cancels any ongoing callto - * `async_receive`. + * @li operation::receive: Cancels any ongoing calls to * `async_receive`. * * @param op: The operation to be cancelled. * @returns The number of operations that have been canceled. diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index 09347b24..29de0296 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include @@ -29,11 +31,47 @@ namespace aedis::detail { +template +auto async_guarded(Channel& channel, Op op, CompletionToken&& token) +{ + return boost::asio::deferred.values(&channel) + | boost::asio::deferred( + [](Channel* ch) + { + return ch->async_receive(boost::asio::append(boost::asio::deferred, ch)); + } + ) + | boost::asio::deferred( + [op2 = std::move(op)](std::error_code ec, std::size_t, Channel* ch) + { + return boost::asio::deferred.when(!ec) + .then(op2(boost::asio::append(boost::asio::deferred, ch))) + .otherwise(boost::asio::deferred.values(ec, 0, ch)); + } + ) + | boost::asio::deferred( + [&](std::error_code ec, std::size_t n, Channel* ch) + { + return boost::asio::deferred.when(!ec) + .then(ch->async_send({}, 0, boost::asio::append(boost::asio::deferred, n))) + .otherwise(boost::asio::deferred.values(ec, 0)); + } + ) + | boost::asio::deferred( + [](std::error_code ec, std::size_t n) + { + return boost::asio::deferred.when(!ec) + .then(boost::asio::deferred.values(boost::system::error_code{}, n)) + .otherwise(boost::asio::deferred.values(ec, 0)); + } + ) + | std::forward(token); +} + template struct receive_op { Conn* conn = nullptr; Adapter adapter; - std::size_t read_size = 0; boost::asio::coroutine coro{}; template @@ -44,26 +82,16 @@ struct receive_op { { reenter (coro) { - yield conn->push_channel_.async_receive(std::move(self)); - AEDIS_CHECK_OP1(); - yield - resp3::async_read( - conn->next_layer(), - conn->make_dynamic_buffer(adapter.get_max_read_size(0)), - adapter, std::move(self)); - - // cancel(receive) is needed to cancel the channel, otherwise - // the read operation will be blocked forever see - // test_push_adapter. + async_guarded( + conn->push_channel_, + resp3::async_read( + conn->next_layer(), + conn->make_dynamic_buffer(adapter.get_max_read_size(0)), + adapter, boost::asio::deferred), + std::move(self)); AEDIS_CHECK_OP1(conn->cancel(operation::run); conn->cancel(operation::receive)); - - read_size = n; - - yield conn->push_channel_.async_send({}, 0, std::move(self)); - AEDIS_CHECK_OP1(); - - self.complete({}, read_size); + self.complete({}, n); return; } } diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index 5743c706..84e041b8 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -171,43 +171,40 @@ void add_separator(Request& to) * * \li Non-string types will be converted to string by using \c * to_bulk, which must be made available over ADL. - * \li Uses std::string as internal storage. + * \li Uses a std::pmr::string for internal storage. */ class request { public: /// Request configuration options. struct config { - /** \brief If set to true, requests started with - * `aedis::connection::async_exec` will fail if the connection is - * lost while the request is pending. The default + /** \brief If true the request will complete with error if the + * connection is lost while the request is pending. The default * behaviour is not to close requests. */ bool cancel_on_connection_lost = false; - /** \brief If true this request will be coalesced with other requests, - * see https://redis.io/topics/pipelining. If false, this - * request will be sent individually. + /** \brief If true the request will be coalesced with other requests, + * see https://redis.io/topics/pipelining. Otherwise the + * request is sent individually. */ bool coalesce = true; - /** \brief If set to true, requests started with - * `aedis::connection::async_exec` will fail if the call happens - * before the connection with Redis was stablished. + /** \brief If true, the request will complete with error if the + * call happens before the connection with Redis was stablished. */ bool cancel_if_not_connected = false; /** \brief If true, the implementation will resend this - * request if it remained unresponded when - * `aedis::connection::async_run` completed. Has effect only if + * request if it remains unresponded when + * `aedis::connection::async_run` completes. Has effect only if * cancel_on_connection_lost is true. */ bool retry = true; /** \brief If this request has a HELLO command and this flag is - * set to true, the `aedis::connection` will move it to the - * front of the queue of awaiting requests. This makes it - * possible to send HELLO and authenticate before other - * commands are sent. + * true, the `aedis::connection` will move it to the front of + * the queue of awaiting requests. This makes it possible to + * send HELLO and authenticate before other commands are sent. */ bool hello_with_priority = true; }; @@ -239,7 +236,7 @@ class request { commands_ = 0; } - /// Calls std::string::reserve on the internal storage. + /// Calls std::pmr::string::reserve on the internal storage. void reserve(std::size_t new_cap = 0) { payload_.reserve(new_cap); }