diff --git a/.clang-format b/.clang-format index 3e6159d411..386c17628f 100644 --- a/.clang-format +++ b/.clang-format @@ -18,6 +18,11 @@ IncludeCategories: - Regex: '' Priority: 30 + # coroutine.hpp has to go before the boost headers, + # otherwise boost::asio fails to compile on GCC 12 + - Regex: '"task\.hpp"' + Priority: 30 + # Silkworm headers - Regex: ' +#include #include -#include #include namespace silkworm::cmd::common { class ShutdownSignal { public: - explicit ShutdownSignal(boost::asio::io_context& io_context) - : signals_(io_context, SIGINT, SIGTERM) {} + explicit ShutdownSignal(boost::asio::any_io_executor executor) + : signals_(executor, SIGINT, SIGTERM) {} using SignalNumber = int; diff --git a/cmd/dev/backend_kv_server.cpp b/cmd/dev/backend_kv_server.cpp index ebdec9b017..485821b7f8 100644 --- a/cmd/dev/backend_kv_server.cpp +++ b/cmd/dev/backend_kv_server.cpp @@ -250,7 +250,7 @@ int main(int argc, char* argv[]) { tasks = server.async_run("bekv-server"); } - ShutdownSignal shutdown_signal{context_pool.next_io_context()}; + ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()}; // Go! auto run_future = boost::asio::co_spawn( diff --git a/cmd/dev/snapshots.cpp b/cmd/dev/snapshots.cpp index fb47e3e0a3..4d01f68b6f 100644 --- a/cmd/dev/snapshots.cpp +++ b/cmd/dev/snapshots.cpp @@ -311,7 +311,7 @@ void download(const BitTorrentSettings& settings) { BitTorrentClient client{settings}; boost::asio::io_context scheduler; - ShutdownSignal shutdown_signal{scheduler}; + ShutdownSignal shutdown_signal{scheduler.get_executor()}; shutdown_signal.on_signal([&](ShutdownSignal::SignalNumber /*num*/) { client.stop(); SILK_DEBUG << "Torrent client stopped"; diff --git a/cmd/sentry.cpp b/cmd/sentry.cpp index 6f9ecfa99f..aabdbf9e2e 100644 --- a/cmd/sentry.cpp +++ b/cmd/sentry.cpp @@ -75,9 +75,9 @@ void sentry_main(Settings settings) { [] { return std::make_unique(); }, }; - ShutdownSignal shutdown_signal{context_pool.next_io_context()}; + ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()}; - Sentry sentry{std::move(settings), context_pool}; + Sentry sentry{std::move(settings), context_pool.as_executor_pool()}; auto run_future = boost::asio::co_spawn( context_pool.next_io_context(), diff --git a/cmd/silkworm.cpp b/cmd/silkworm.cpp index 08246b960e..980e8b8bb5 100644 --- a/cmd/silkworm.cpp +++ b/cmd/silkworm.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -66,7 +65,6 @@ using silkworm::lookup_known_chain; using silkworm::NodeSettings; using silkworm::parse_size; using silkworm::PreverifiedHashes; -using silkworm::StopWatch; using silkworm::cmd::common::add_context_pool_options; using silkworm::cmd::common::add_logging_options; using silkworm::cmd::common::add_node_options; @@ -219,8 +217,11 @@ using SentryClientPtr = std::shared_ptr; using SentryServerPtr = std::shared_ptr; using SentryPtrPair = std::tuple; -static SentryPtrPair make_sentry(sentry::Settings sentry_settings, NodeSettings& node_settings, - rpc::ServerContextPool& context_pool, db::ROAccess db_access) { +static SentryPtrPair make_sentry( + sentry::Settings sentry_settings, + NodeSettings& node_settings, + rpc::ServerContextPool& context_pool, + db::ROAccess db_access) { SentryServerPtr sentry_server; SentryClientPtr sentry_client; @@ -232,7 +233,7 @@ static SentryPtrPair make_sentry(sentry::Settings sentry_settings, NodeSettings& sentry_settings.api_address = ""; // Create embedded server - sentry_server = std::make_shared(std::move(sentry_settings), context_pool); + sentry_server = std::make_shared(std::move(sentry_settings), context_pool.as_executor_pool()); // Wrap direct client i.e. server in a session client sentry_client = std::make_shared( @@ -348,7 +349,7 @@ int main(int argc, char* argv[]) { .jwt_secret_file = settings.rpcdaemon_settings.jwt_secret_file, }; chainsync::Sync chain_sync_process{ - context_pool.next_io_context(), + context_pool.next_io_context().get_executor(), chaindata_db, execution_client, sentry_client, @@ -365,7 +366,7 @@ int main(int argc, char* argv[]) { chain_sync_process.async_run(); // Trap OS signals - ShutdownSignal shutdown_signal{context_pool.next_io_context()}; + ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()}; // Go! auto run_future = boost::asio::co_spawn( diff --git a/cmd/test/backend_kv_test.cpp b/cmd/test/backend_kv_test.cpp index 77dbc53e98..7e125d2ea5 100644 --- a/cmd/test/backend_kv_test.cpp +++ b/cmd/test/backend_kv_test.cpp @@ -1025,7 +1025,7 @@ int main(int argc, char* argv[]) { }}; boost::asio::io_context scheduler; - silkworm::cmd::common::ShutdownSignal shutdown_signal{scheduler}; + silkworm::cmd::common::ShutdownSignal shutdown_signal{scheduler.get_executor()}; shutdown_signal.on_signal([&](silkworm::cmd::common::ShutdownSignal::SignalNumber /*num*/) { pump_stop = true; completion_stop = true; diff --git a/silkworm/infra/common/asio_timer.hpp b/silkworm/infra/common/asio_timer.hpp index 0d91fc08a1..0a56258aa4 100644 --- a/silkworm/infra/common/asio_timer.hpp +++ b/silkworm/infra/common/asio_timer.hpp @@ -25,8 +25,8 @@ // otherwise dependency doesn't compile #define _DARWIN_C_SOURCE #endif +#include #include -#include #include #include @@ -38,13 +38,18 @@ using namespace std::chrono_literals; //! \brief Implementation of an asynchronous timer relying on boost:asio class Timer { public: - //! \param asio_context [in] : boost's asio context + //! \param executor [in] : executor //! \param interval [in] : length of wait interval (in milliseconds) //! \param call_back [in] : the call back function to be called //! \param auto_start [in] : whether to start the timer immediately - explicit Timer(boost::asio::io_context& asio_context, uint32_t interval, std::function call_back, - bool auto_start = false) - : interval_(interval), timer_(asio_context), call_back_(std::move(call_back)) { + explicit Timer( + boost::asio::any_io_executor executor, + uint32_t interval, + std::function call_back, + bool auto_start = false) + : interval_(interval), + timer_(std::move(executor)), + call_back_(std::move(call_back)) { SILKWORM_ASSERT(interval > 0); if (auto_start) { start(); diff --git a/silkworm/infra/concurrency/awaitable_condition_variable.cpp b/silkworm/infra/concurrency/awaitable_condition_variable.cpp index 909114695b..07ffb4a606 100644 --- a/silkworm/infra/concurrency/awaitable_condition_variable.cpp +++ b/silkworm/infra/concurrency/awaitable_condition_variable.cpp @@ -28,14 +28,14 @@ namespace silkworm::concurrency { class AwaitableConditionVariableImpl { public: - std::function()> waiter() { + std::function()> waiter() { size_t waiter_version; { std::scoped_lock lock(mutex_); waiter_version = version_; } - return [this, waiter_version]() -> boost::asio::awaitable { + return [this, waiter_version]() -> Task { auto executor = co_await boost::asio::this_coro::executor; decltype(waiters_)::iterator waiter; @@ -81,7 +81,7 @@ AwaitableConditionVariable::~AwaitableConditionVariable() { [[maybe_unused]] int non_trivial_destructor; // silent clang-tidy } -std::function()> AwaitableConditionVariable::waiter() { +std::function()> AwaitableConditionVariable::waiter() { return p_impl_->waiter(); } diff --git a/silkworm/infra/concurrency/awaitable_condition_variable.hpp b/silkworm/infra/concurrency/awaitable_condition_variable.hpp index 004cedd633..31d7160c13 100644 --- a/silkworm/infra/concurrency/awaitable_condition_variable.hpp +++ b/silkworm/infra/concurrency/awaitable_condition_variable.hpp @@ -19,11 +19,7 @@ #include #include -#include - -#include -#include -#include +#include "task.hpp" namespace silkworm::concurrency { @@ -57,7 +53,7 @@ class AwaitableConditionVariable { AwaitableConditionVariable(); virtual ~AwaitableConditionVariable(); - using Waiter = std::function()>; + using Waiter = std::function()>; Waiter waiter(); void notify_all(); diff --git a/silkworm/infra/concurrency/awaitable_future.hpp b/silkworm/infra/concurrency/awaitable_future.hpp index 4a3f632cb8..d8d2b78b5e 100644 --- a/silkworm/infra/concurrency/awaitable_future.hpp +++ b/silkworm/infra/concurrency/awaitable_future.hpp @@ -19,19 +19,15 @@ #include #include -#include +#include "task.hpp" #include -#include #include -#include #include #include namespace silkworm::concurrency { -namespace asio = boost::asio; - // An awaitable-friendly future/promise // See also: https://docs.rs/tokio/1.25.0/tokio/sync/oneshot/index.html @@ -47,9 +43,9 @@ class AwaitableFuture { AwaitableFuture(AwaitableFuture&&) noexcept = default; AwaitableFuture& operator=(AwaitableFuture&&) noexcept = default; - asio::awaitable get_async() { + Task get_async() { try { - std::optional result = co_await channel_->async_receive(asio::use_awaitable); + std::optional result = co_await channel_->async_receive(boost::asio::use_awaitable); co_return std::move(result.value()); } catch (const boost::system::system_error& ex) { close_and_throw_if_cancelled(ex); @@ -59,7 +55,7 @@ class AwaitableFuture { T get() { try { - std::optional result = channel_->async_receive(asio::use_future).get(); + std::optional result = channel_->async_receive(boost::asio::use_future).get(); return std::move(result.value()); } catch (const boost::system::system_error& ex) { close_and_throw_if_cancelled(ex); @@ -70,7 +66,7 @@ class AwaitableFuture { private: friend class AwaitablePromise; - using AsyncChannel = asio::experimental::concurrent_channel)>; + using AsyncChannel = boost::asio::experimental::concurrent_channel)>; explicit AwaitableFuture(std::shared_ptr channel) : channel_(channel) {} @@ -88,13 +84,11 @@ class AwaitableFuture { template class AwaitablePromise { - constexpr static size_t one_shot_channel = 1; using AsyncChannel = typename AwaitableFuture::AsyncChannel; public: - explicit AwaitablePromise(asio::any_io_executor&& executor) : channel_(std::make_shared(executor, one_shot_channel)) {} - explicit AwaitablePromise(asio::any_io_executor& executor) : channel_(std::make_shared(executor, one_shot_channel)) {} - explicit AwaitablePromise(asio::io_context& io_context) : channel_(std::make_shared(io_context, one_shot_channel)) {} + explicit AwaitablePromise(boost::asio::any_io_executor executor) + : channel_(std::make_shared(std::move(executor), 1)) {} AwaitablePromise(const AwaitablePromise&) = delete; AwaitablePromise& operator=(const AwaitablePromise&) = delete; diff --git a/silkworm/infra/concurrency/awaitable_future_test.cpp b/silkworm/infra/concurrency/awaitable_future_test.cpp index 581a8ff232..e81e32758e 100644 --- a/silkworm/infra/concurrency/awaitable_future_test.cpp +++ b/silkworm/infra/concurrency/awaitable_future_test.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "active_component.hpp" @@ -32,8 +33,8 @@ namespace asio = boost::asio; using concurrency::AwaitableFuture; using concurrency::AwaitablePromise; -auto create_promise_and_set_value(asio::io_context& io, int value) { - concurrency::AwaitablePromise promise{io}; +auto create_promise_and_set_value(asio::any_io_executor executor, int value) { + concurrency::AwaitablePromise promise{std::move(executor)}; promise.set_value(value); return promise.get_future(); } @@ -46,10 +47,9 @@ class TestException : public std::runtime_error { TEST_CASE("awaitable future") { asio::io_context io; asio::io_context::work work{io}; + AwaitablePromise promise{io.get_executor()}; SECTION("trivial use") { - AwaitablePromise promise{io}; - auto future = promise.get_future(); promise.set_value(42); @@ -59,7 +59,6 @@ TEST_CASE("awaitable future") { } SECTION("variation of the trivial use") { - AwaitablePromise promise{io}; promise.set_value(42); auto future = promise.get_future(); @@ -69,7 +68,6 @@ TEST_CASE("awaitable future") { } SECTION("setting exception instead of value") { - AwaitablePromise promise{io}; auto future = promise.get_future(); promise.set_exception(std::make_exception_ptr(TestException())); @@ -78,7 +76,6 @@ TEST_CASE("awaitable future") { } SECTION("variation of setting exception instead of value") { - AwaitablePromise promise{io}; auto future = promise.get_future(); try { @@ -91,23 +88,19 @@ TEST_CASE("awaitable future") { } SECTION("setting value two times fails") { - AwaitablePromise promise{io}; - promise.set_value(42); CHECK_THROWS(promise.set_value(43)); } SECTION("setting exception two times fails") { - AwaitablePromise promise{io}; - promise.set_exception(std::make_exception_ptr(TestException())); CHECK_THROWS(promise.set_exception(std::make_exception_ptr(TestException()))); } SECTION("returning the future from a function") { - auto future = create_promise_and_set_value(io, 42); + auto future = create_promise_and_set_value(io.get_executor(), 42); auto value = future.get(); @@ -115,10 +108,10 @@ TEST_CASE("awaitable future") { } SECTION("returning the future from a function (variation)") { - auto returned_future = [&]() { - concurrency::AwaitablePromise promise{io}; - auto future = promise.get_future(); - promise.set_value(42); + auto returned_future = [executor = io.get_executor()]() { + concurrency::AwaitablePromise promise1{executor}; + auto future = promise1.get_future(); + promise1.set_value(42); return future; }(); @@ -128,7 +121,6 @@ TEST_CASE("awaitable future") { } SECTION("writing and reading from different threads") { - AwaitablePromise promise{io}; auto future = promise.get_future(); int value; @@ -148,7 +140,6 @@ TEST_CASE("awaitable future") { } SECTION("writing and reading from different threads") { - AwaitablePromise promise{io}; auto future = promise.get_future(); int value; @@ -174,7 +165,6 @@ TEST_CASE("awaitable future") { } SECTION("using coroutines in read in the same io_context, write before read") { - AwaitablePromise promise{io}; int value; asio::co_spawn( @@ -193,7 +183,6 @@ TEST_CASE("awaitable future") { } SECTION("variation of using coroutines in the same io_context, write before read") { - AwaitablePromise promise{io}; auto future = promise.get_future(); int value; @@ -212,7 +201,6 @@ TEST_CASE("awaitable future") { } SECTION("moving AwaitableFuture") { - AwaitablePromise promise{io}; auto future = promise.get_future(); int value; @@ -230,7 +218,6 @@ TEST_CASE("awaitable future") { } SECTION("using coroutine for both read and write, read before write") { - AwaitablePromise promise{io}; int value; asio::co_spawn( @@ -256,7 +243,6 @@ TEST_CASE("awaitable future") { } SECTION("cancellation after read") { - AwaitablePromise promise{io}; int value; boost::system::error_code code; diff --git a/silkworm/infra/concurrency/channel.hpp b/silkworm/infra/concurrency/channel.hpp index 59ed9c348e..40253182f9 100644 --- a/silkworm/infra/concurrency/channel.hpp +++ b/silkworm/infra/concurrency/channel.hpp @@ -16,13 +16,11 @@ #pragma once -#include +#include "task.hpp" #include -#include #include #include -#include #include #include #include @@ -33,17 +31,11 @@ namespace silkworm::concurrency { template class Channel { public: - explicit Channel(boost::asio::any_io_executor&& executor) : channel_(executor) {} - explicit Channel(boost::asio::any_io_executor& executor) : channel_(executor) {} - explicit Channel(boost::asio::io_context& io_context) : channel_(io_context) {} - Channel(boost::asio::any_io_executor&& executor, std::size_t max_buffer_size) - : channel_(executor, max_buffer_size) {} - Channel(boost::asio::any_io_executor& executor, std::size_t max_buffer_size) - : channel_(executor, max_buffer_size) {} - Channel(boost::asio::io_context& io_context, std::size_t max_buffer_size) - : channel_(io_context, max_buffer_size) {} + explicit Channel(boost::asio::any_io_executor executor) : channel_(std::move(executor)) {} + Channel(boost::asio::any_io_executor executor, std::size_t max_buffer_size) + : channel_(std::move(executor), max_buffer_size) {} - boost::asio::awaitable send(T value) { + Task send(T value) { try { co_await channel_.async_send(boost::system::error_code(), value, boost::asio::use_awaitable); } catch (const boost::system::system_error& ex) { @@ -57,7 +49,7 @@ class Channel { return channel_.try_send(boost::system::error_code(), value); } - boost::asio::awaitable receive() { + Task receive() { try { co_return (co_await channel_.async_receive(boost::asio::use_awaitable)); } catch (const boost::system::system_error& ex) { diff --git a/silkworm/infra/concurrency/channel_test.cpp b/silkworm/infra/concurrency/channel_test.cpp index f422351d55..ec25bf163b 100644 --- a/silkworm/infra/concurrency/channel_test.cpp +++ b/silkworm/infra/concurrency/channel_test.cpp @@ -48,7 +48,7 @@ TResult run(io_context& context, awaitable awaitable1) { TEST_CASE("Channel.close_and_send") { io_context context; - Channel channel{context}; + Channel channel{context.get_executor()}; channel.close(); // boost::asio::experimental::error::channel_errors::channel_closed CHECK_THROWS_AS(run(context, channel.send(1)), boost::system::system_error); @@ -56,7 +56,7 @@ TEST_CASE("Channel.close_and_send") { TEST_CASE("Channel.close_and_receive") { io_context context; - Channel channel{context}; + Channel channel{context.get_executor()}; channel.close(); // boost::asio::experimental::error::channel_errors::channel_closed CHECK_THROWS_AS(run(context, channel.receive()), boost::system::system_error); diff --git a/silkworm/infra/concurrency/context_pool.hpp b/silkworm/infra/concurrency/context_pool.hpp index fad6780471..eb9f785ed2 100644 --- a/silkworm/infra/concurrency/context_pool.hpp +++ b/silkworm/infra/concurrency/context_pool.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace silkworm::concurrency { @@ -75,7 +76,7 @@ std::ostream& operator<<(std::ostream& out, const Context& c); //! Pool of \ref Context instances running as separate reactive schedulers. template -class ContextPool { +class ContextPool : public ExecutorPool { using ExceptionHandler = std::function; public: @@ -90,7 +91,7 @@ class ContextPool { add_context(T{contexts_.size(), settings.wait_mode}); } } - virtual ~ContextPool() { + ~ContextPool() override { SILK_TRACE << "ContextPool::~ContextPool START " << this; stop(); join(); @@ -184,6 +185,15 @@ class ContextPool { return *context.io_context(); } + // ExecutorPool + [[nodiscard]] boost::asio::any_io_executor any_executor() override { + return this->next_io_context().get_executor(); + } + + [[nodiscard]] ExecutorPool& as_executor_pool() { + return *this; + } + void set_exception_handler(ExceptionHandler exception_handler) { exception_handler_ = exception_handler; } diff --git a/silkworm/infra/concurrency/event_notifier.hpp b/silkworm/infra/concurrency/event_notifier.hpp index bf235b8d2d..b77368ac49 100644 --- a/silkworm/infra/concurrency/event_notifier.hpp +++ b/silkworm/infra/concurrency/event_notifier.hpp @@ -18,11 +18,9 @@ #include -#include +#include "task.hpp" #include -#include -#include #include "channel.hpp" @@ -33,11 +31,9 @@ namespace silkworm::concurrency { // Only one waiter is supported. class EventNotifier { public: - explicit EventNotifier(boost::asio::any_io_executor&& executor) : channel_(executor, 1) {} - explicit EventNotifier(boost::asio::any_io_executor& executor) : channel_(executor, 1) {} - explicit EventNotifier(boost::asio::io_context& io_context) : channel_(io_context, 1) {} + explicit EventNotifier(boost::asio::any_io_executor executor) : channel_(std::move(executor), 1) {} - boost::asio::awaitable wait() { + Task wait() { co_await channel_.receive(); } diff --git a/silkworm/infra/concurrency/executor_pool.hpp b/silkworm/infra/concurrency/executor_pool.hpp new file mode 100644 index 0000000000..8a838162a5 --- /dev/null +++ b/silkworm/infra/concurrency/executor_pool.hpp @@ -0,0 +1,28 @@ +/* + Copyright 2023 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace silkworm::concurrency { + +struct ExecutorPool { + virtual ~ExecutorPool() = default; + [[nodiscard]] virtual boost::asio::any_io_executor any_executor() = 0; +}; + +} // namespace silkworm::concurrency diff --git a/silkworm/infra/concurrency/task_group.cpp b/silkworm/infra/concurrency/task_group.cpp index 7f63a25989..2dcfe5795d 100644 --- a/silkworm/infra/concurrency/task_group.cpp +++ b/silkworm/infra/concurrency/task_group.cpp @@ -32,7 +32,7 @@ namespace silkworm::concurrency { using namespace boost::asio; -void TaskGroup::spawn(any_io_executor&& executor, awaitable task) { +void TaskGroup::spawn(any_io_executor executor, awaitable task) { std::scoped_lock lock(mutex_); if (is_closed_) { diff --git a/silkworm/infra/concurrency/task_group.hpp b/silkworm/infra/concurrency/task_group.hpp index 2d0f405203..00f8d2fc1f 100644 --- a/silkworm/infra/concurrency/task_group.hpp +++ b/silkworm/infra/concurrency/task_group.hpp @@ -21,12 +21,10 @@ #include #include -#include +#include "task.hpp" #include -#include #include -#include #include @@ -47,17 +45,17 @@ namespace silkworm::concurrency { * * \code * - * TaskGroup task_group{io_context, 10}; + * TaskGroup task_group{executor, 10}; * - * awaitable run_server() { + * Task run_server() { * co_await (accept_connections() && task_group.wait()); * } * - * awaitable accept_connections() { + * Task accept_connections() { * auto connection = accept(); * if (num_clients < 10) { * num_clients++; - * task_group.spawn(io_context, handle_connection(std::move(connection))); + * task_group.spawn(executor, handle_connection(std::move(connection))); * } * } * @@ -67,12 +65,8 @@ namespace silkworm::concurrency { */ class TaskGroup { public: - TaskGroup(boost::asio::any_io_executor&& executor, std::size_t max_tasks) - : completions_(executor, max_tasks) {} - TaskGroup(boost::asio::any_io_executor& executor, std::size_t max_tasks) - : completions_(executor, max_tasks) {} - TaskGroup(boost::asio::io_context& io_context, std::size_t max_tasks) - : completions_(io_context, max_tasks) {} + TaskGroup(boost::asio::any_io_executor executor, std::size_t max_tasks) + : completions_(std::move(executor), max_tasks) {} TaskGroup(const TaskGroup&) = delete; TaskGroup& operator=(const TaskGroup&) = delete; @@ -83,20 +77,10 @@ class TaskGroup { }; //! Similar to co_spawn, but also adds the task to this group until it completes. - void spawn(boost::asio::any_io_executor&& executor, boost::asio::awaitable task); - - //! Similar to co_spawn, but also adds the task to this group until it completes. - void spawn(boost::asio::any_io_executor& executor, boost::asio::awaitable task) { - spawn(boost::asio::any_io_executor{executor}, std::move(task)); - } - - //! Similar to co_spawn, but also adds the task to this group until it completes. - void spawn(boost::asio::io_context& io_context, boost::asio::awaitable task) { - spawn(boost::asio::any_io_executor{io_context.get_executor()}, std::move(task)); - } + void spawn(boost::asio::any_io_executor executor, Task task); //! Waits until a cancellation signal. then cancels all pending tasks, and waits for them to complete. - boost::asio::awaitable wait(); + Task wait(); private: void close(); diff --git a/silkworm/infra/concurrency/task_group_test.cpp b/silkworm/infra/concurrency/task_group_test.cpp index 080cbc6390..e4f346e443 100644 --- a/silkworm/infra/concurrency/task_group_test.cpp +++ b/silkworm/infra/concurrency/task_group_test.cpp @@ -79,33 +79,37 @@ static TResult run(io_context& context, awaitable awaitable1) { TEST_CASE("TaskGroup.0") { io_context context; - TaskGroup group{context, 0}; + auto executor = context.get_executor(); + TaskGroup group{executor, 0}; CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); } TEST_CASE("TaskGroup.1") { io_context context; - TaskGroup group{context, 1}; - group.spawn(context, async_ok()); + auto executor = context.get_executor(); + TaskGroup group{executor, 1}; + group.spawn(executor, async_ok()); CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); } TEST_CASE("TaskGroup.1.wait_until_cancelled") { io_context context; - TaskGroup group{context, 1}; + auto executor = context.get_executor(); + TaskGroup group{executor, 1}; bool is_cancelled = false; - group.spawn(context, wait_until_cancelled(&is_cancelled)); + group.spawn(executor, wait_until_cancelled(&is_cancelled)); CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); CHECK(is_cancelled); } TEST_CASE("TaskGroup.some.wait_until_cancelled") { io_context context; - TaskGroup group{context, 3}; + auto executor = context.get_executor(); + TaskGroup group{executor, 3}; std::array is_cancelled{}; - group.spawn(context, wait_until_cancelled(&is_cancelled[0])); - group.spawn(context, wait_until_cancelled(&is_cancelled[1])); - group.spawn(context, wait_until_cancelled(&is_cancelled[2])); + group.spawn(executor, wait_until_cancelled(&is_cancelled[0])); + group.spawn(executor, wait_until_cancelled(&is_cancelled[1])); + group.spawn(executor, wait_until_cancelled(&is_cancelled[2])); CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); CHECK(is_cancelled[0]); CHECK(is_cancelled[1]); @@ -114,14 +118,15 @@ TEST_CASE("TaskGroup.some.wait_until_cancelled") { TEST_CASE("TaskGroup.some.mix") { io_context context; - TaskGroup group{context, 6}; + auto executor = context.get_executor(); + TaskGroup group{executor, 6}; std::array is_cancelled{}; - group.spawn(context, async_ok()); - group.spawn(context, wait_until_cancelled(&is_cancelled[0])); - group.spawn(context, async_ok()); - group.spawn(context, wait_until_cancelled(&is_cancelled[1])); - group.spawn(context, async_ok()); - group.spawn(context, wait_until_cancelled(&is_cancelled[2])); + group.spawn(executor, async_ok()); + group.spawn(executor, wait_until_cancelled(&is_cancelled[0])); + group.spawn(executor, async_ok()); + group.spawn(executor, wait_until_cancelled(&is_cancelled[1])); + group.spawn(executor, async_ok()); + group.spawn(executor, wait_until_cancelled(&is_cancelled[2])); CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); CHECK(is_cancelled[0]); CHECK(is_cancelled[1]); @@ -130,9 +135,10 @@ TEST_CASE("TaskGroup.some.mix") { TEST_CASE("TaskGroup.spawn_after_close") { io_context context; - TaskGroup group{context, 1}; + auto executor = context.get_executor(); + TaskGroup group{executor, 1}; CHECK_THROWS_AS(run(context, group.wait() && async_throw()), TestException); - CHECK_THROWS_AS(group.spawn(context, async_ok()), TaskGroup::SpawnAfterCloseError); + CHECK_THROWS_AS(group.spawn(executor, async_ok()), TaskGroup::SpawnAfterCloseError); } } // namespace silkworm::concurrency diff --git a/silkworm/infra/grpc/client/client_context_pool.hpp b/silkworm/infra/grpc/client/client_context_pool.hpp index 55005eb719..e64f751d20 100644 --- a/silkworm/infra/grpc/client/client_context_pool.hpp +++ b/silkworm/infra/grpc/client/client_context_pool.hpp @@ -24,7 +24,6 @@ #include #include -#include #include #include diff --git a/silkworm/infra/grpc/server/call.hpp b/silkworm/infra/grpc/server/call.hpp index 7cbfd4ad61..5617ad88c3 100644 --- a/silkworm/infra/grpc/server/call.hpp +++ b/silkworm/infra/grpc/server/call.hpp @@ -23,9 +23,6 @@ #include #include -#include -#include -#include #include #include #include diff --git a/silkworm/infra/grpc/server/server_context_pool.hpp b/silkworm/infra/grpc/server/server_context_pool.hpp index 1e0e821f34..7cebfaaf03 100644 --- a/silkworm/infra/grpc/server/server_context_pool.hpp +++ b/silkworm/infra/grpc/server/server_context_pool.hpp @@ -22,7 +22,6 @@ #include #include -#include #include #include diff --git a/silkworm/node/stagedsync/execution_engine.cpp b/silkworm/node/stagedsync/execution_engine.cpp index 24f56905d0..d1e9118bc5 100644 --- a/silkworm/node/stagedsync/execution_engine.cpp +++ b/silkworm/node/stagedsync/execution_engine.cpp @@ -138,14 +138,14 @@ auto ExecutionEngine::verify_chain(Hash head_block_hash) -> concurrency::Awaitab if (last_fork_choice_.hash == head_block_hash) { SILK_DEBUG << "ExecutionEngine: chain " << head_block_hash.to_hex() << " already verified"; - concurrency::AwaitablePromise promise{io_context_}; + concurrency::AwaitablePromise promise{io_context_.get_executor()}; promise.set_value(ValidChain{last_fork_choice_}); return promise.get_future(); } if (!fork_tracking_active_) { auto verification = main_chain_.verify_chain(head_block_hash); // BLOCKING - concurrency::AwaitablePromise promise{io_context_}; + concurrency::AwaitablePromise promise{io_context_.get_executor()}; promise.set_value(std::move(verification)); return promise.get_future(); } @@ -154,12 +154,12 @@ auto ExecutionEngine::verify_chain(Hash head_block_hash) -> concurrency::Awaitab if (fork == forks_.end()) { if (main_chain_.is_canonical(head_block_hash)) { SILK_DEBUG << "ExecutionEngine: chain " << head_block_hash.to_hex() << " already verified"; - concurrency::AwaitablePromise promise{io_context_}; + concurrency::AwaitablePromise promise{io_context_.get_executor()}; promise.set_value(ValidChain{last_fork_choice_}); return promise.get_future(); } else { SILK_WARN << "ExecutionEngine: chain " << head_block_hash.to_hex() << " not found at verification time"; - concurrency::AwaitablePromise promise{io_context_}; + concurrency::AwaitablePromise promise{io_context_.get_executor()}; promise.set_value(ValidationError{}); return promise.get_future(); } diff --git a/silkworm/node/stagedsync/execution_engine.hpp b/silkworm/node/stagedsync/execution_engine.hpp index 13912461af..f7939ba4c8 100644 --- a/silkworm/node/stagedsync/execution_engine.hpp +++ b/silkworm/node/stagedsync/execution_engine.hpp @@ -30,8 +30,6 @@ #include #include #include -#include -#include #include #include diff --git a/silkworm/node/stagedsync/execution_pipeline.cpp b/silkworm/node/stagedsync/execution_pipeline.cpp index 12239f1e91..32368f0256 100644 --- a/silkworm/node/stagedsync/execution_pipeline.cpp +++ b/silkworm/node/stagedsync/execution_pipeline.cpp @@ -19,7 +19,9 @@ #include #include +#include #include +#include #include #include #include @@ -46,7 +48,7 @@ class ExecutionPipeline::LogTimer : public Timer { public: LogTimer(ExecutionPipeline* pipeline) : Timer{ - pipeline->node_settings_->asio_context, + pipeline->node_settings_->asio_context.get_executor(), pipeline->node_settings_->sync_loop_log_interval_seconds * 1'000, [this] { return execute(); }, true}, diff --git a/silkworm/node/stagedsync/execution_pipeline.hpp b/silkworm/node/stagedsync/execution_pipeline.hpp index a09cdb6001..eff97b2740 100644 --- a/silkworm/node/stagedsync/execution_pipeline.hpp +++ b/silkworm/node/stagedsync/execution_pipeline.hpp @@ -21,8 +21,6 @@ #include #include -#include -#include #include namespace silkworm::stagedsync { @@ -64,4 +62,5 @@ class ExecutionPipeline : public Stoppable { std::string get_log_prefix() const; // Returns the current log lines prefix on behalf of current stage class LogTimer; // Timer for async log scheduling }; + } // namespace silkworm::stagedsync diff --git a/silkworm/node/stagedsync/forks/canonical_chain.hpp b/silkworm/node/stagedsync/forks/canonical_chain.hpp index f458cecd02..44f4650b4f 100644 --- a/silkworm/node/stagedsync/forks/canonical_chain.hpp +++ b/silkworm/node/stagedsync/forks/canonical_chain.hpp @@ -25,8 +25,6 @@ #include #include #include -#include -#include #include #include #include diff --git a/silkworm/node/stagedsync/forks/extending_fork.cpp b/silkworm/node/stagedsync/forks/extending_fork.cpp index 627ebc9166..171c385030 100644 --- a/silkworm/node/stagedsync/forks/extending_fork.cpp +++ b/silkworm/node/stagedsync/forks/extending_fork.cpp @@ -96,7 +96,7 @@ void ExtendingFork::extend_with(Hash head_hash, const Block& block) { auto ExtendingFork::verify_chain() -> concurrency::AwaitableFuture { propagate_exception_if_any(); - concurrency::AwaitablePromise promise{io_context_}; // note: promise uses an external io_context + concurrency::AwaitablePromise promise{io_context_.get_executor()}; // note: promise uses an external io_context auto awaitable_future = promise.get_future(); post(*executor_, [this, promise_ = std::move(promise)]() mutable { @@ -117,7 +117,7 @@ auto ExtendingFork::fork_choice(Hash head_block_hash, std::optional finali -> concurrency::AwaitableFuture { propagate_exception_if_any(); - concurrency::AwaitablePromise promise{io_context_}; // note: promise uses an external io_context + concurrency::AwaitablePromise promise{io_context_.get_executor()}; // note: promise uses an external io_context auto awaitable_future = promise.get_future(); post(*executor_, [this, promise_ = std::move(promise), head_block_hash, finalized_block_hash]() mutable { diff --git a/silkworm/node/stagedsync/forks/main_chain.cpp b/silkworm/node/stagedsync/forks/main_chain.cpp index d240ea0f91..087a76835a 100644 --- a/silkworm/node/stagedsync/forks/main_chain.cpp +++ b/silkworm/node/stagedsync/forks/main_chain.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include "extending_fork.hpp" diff --git a/silkworm/node/stagedsync/forks/main_chain.hpp b/silkworm/node/stagedsync/forks/main_chain.hpp index d0e1951281..bd1cec3122 100644 --- a/silkworm/node/stagedsync/forks/main_chain.hpp +++ b/silkworm/node/stagedsync/forks/main_chain.hpp @@ -27,8 +27,6 @@ #include #include #include -#include -#include #include #include #include diff --git a/silkworm/node/stagedsync/stages/stage_bodies.cpp b/silkworm/node/stagedsync/stages/stage_bodies.cpp index d05337dd82..fa95caefd8 100644 --- a/silkworm/node/stagedsync/stages/stage_bodies.cpp +++ b/silkworm/node/stagedsync/stages/stage_bodies.cpp @@ -22,7 +22,6 @@ #include #include -#include #include #include #include diff --git a/silkworm/sentry/common/socket_stream.hpp b/silkworm/sentry/common/socket_stream.hpp index 96240d4cf7..9b95506cc3 100644 --- a/silkworm/sentry/common/socket_stream.hpp +++ b/silkworm/sentry/common/socket_stream.hpp @@ -19,7 +19,6 @@ #include #include -#include #include #include @@ -28,9 +27,7 @@ namespace silkworm::sentry { class SocketStream { public: - explicit SocketStream(boost::asio::any_io_executor&& executor) : socket_(executor) {} - explicit SocketStream(boost::asio::any_io_executor& executor) : socket_(executor) {} - explicit SocketStream(boost::asio::io_context& io_context) : socket_(io_context) {} + explicit SocketStream(boost::asio::any_io_executor executor) : socket_(std::move(executor)) {} SocketStream(SocketStream&&) = default; SocketStream& operator=(SocketStream&&) = default; diff --git a/silkworm/sentry/discovery/discovery.cpp b/silkworm/sentry/discovery/discovery.cpp index 3e1bed3cb6..c5a026d4e9 100644 --- a/silkworm/sentry/discovery/discovery.cpp +++ b/silkworm/sentry/discovery/discovery.cpp @@ -32,7 +32,7 @@ using namespace boost::asio; class DiscoveryImpl { public: explicit DiscoveryImpl( - std::function executor_pool, + concurrency::ExecutorPool& executor_pool, std::vector peer_urls, bool with_dynamic_discovery, const std::filesystem::path& data_dir_path, @@ -64,7 +64,7 @@ class DiscoveryImpl { }; DiscoveryImpl::DiscoveryImpl( - std::function executor_pool, + concurrency::ExecutorPool& executor_pool, std::vector peer_urls, bool with_dynamic_discovery, const std::filesystem::path& data_dir_path, @@ -74,8 +74,8 @@ DiscoveryImpl::DiscoveryImpl( : peer_urls_(std::move(peer_urls)), with_dynamic_discovery_(with_dynamic_discovery), data_dir_path_(data_dir_path), - node_db_(executor_pool()), - disc_v4_discovery_(executor_pool(), disc_v4_port, node_key, node_url, node_db_.interface()) { + node_db_(executor_pool.any_executor()), + disc_v4_discovery_(executor_pool.any_executor(), disc_v4_port, node_key, node_url, node_db_.interface()) { } Task DiscoveryImpl::run() { @@ -159,7 +159,7 @@ Task DiscoveryImpl::on_peer_disconnected(EccPublicKey peer_public_key, boo } Discovery::Discovery( - std::function executor_pool, + concurrency::ExecutorPool& executor_pool, std::vector peer_urls, bool with_dynamic_discovery, const std::filesystem::path& data_dir_path, @@ -167,7 +167,7 @@ Discovery::Discovery( std::function node_url, uint16_t disc_v4_port) : p_impl_(std::make_unique( - std::move(executor_pool), + executor_pool, std::move(peer_urls), with_dynamic_discovery, data_dir_path, diff --git a/silkworm/sentry/discovery/discovery.hpp b/silkworm/sentry/discovery/discovery.hpp index a4f40952fb..4b4fd06c49 100644 --- a/silkworm/sentry/discovery/discovery.hpp +++ b/silkworm/sentry/discovery/discovery.hpp @@ -23,8 +23,7 @@ #include -#include - +#include #include #include #include @@ -36,7 +35,7 @@ class DiscoveryImpl; class Discovery { public: explicit Discovery( - std::function executor_pool, + concurrency::ExecutorPool& executor_pool, std::vector peer_urls, bool with_dynamic_discovery, const std::filesystem::path& data_dir_path, diff --git a/silkworm/sentry/grpc/server/server_calls.hpp b/silkworm/sentry/grpc/server/server_calls.hpp index 656f69cea7..de060a9f97 100644 --- a/silkworm/sentry/grpc/server/server_calls.hpp +++ b/silkworm/sentry/grpc/server/server_calls.hpp @@ -52,7 +52,6 @@ namespace silkworm::sentry::grpc::server { -using boost::asio::io_context; namespace protobuf = google::protobuf; namespace proto = ::sentry; namespace proto_types = ::types; diff --git a/silkworm/sentry/message_receiver.hpp b/silkworm/sentry/message_receiver.hpp index e39308a3f8..b0515fcd6b 100644 --- a/silkworm/sentry/message_receiver.hpp +++ b/silkworm/sentry/message_receiver.hpp @@ -21,7 +21,7 @@ #include -#include +#include #include #include @@ -38,9 +38,9 @@ namespace silkworm::sentry { class MessageReceiver : public PeerManagerObserver { public: - MessageReceiver(boost::asio::io_context& io_context, size_t max_peers) - : message_calls_channel_(io_context), - strand_(boost::asio::make_strand(io_context)), + MessageReceiver(boost::asio::any_io_executor executor, size_t max_peers) + : message_calls_channel_(executor), + strand_(boost::asio::make_strand(executor)), peer_tasks_(strand_, max_peers), unsubscription_tasks_(strand_, 1000) {} @@ -64,7 +64,7 @@ class MessageReceiver : public PeerManagerObserver { Task on_peer_added_in_strand(std::shared_ptr peer); concurrency::Channel message_calls_channel_; - boost::asio::strand strand_; + boost::asio::strand strand_; concurrency::TaskGroup peer_tasks_; concurrency::TaskGroup unsubscription_tasks_; diff --git a/silkworm/sentry/message_sender.hpp b/silkworm/sentry/message_sender.hpp index f7b151b59f..d824356fa3 100644 --- a/silkworm/sentry/message_sender.hpp +++ b/silkworm/sentry/message_sender.hpp @@ -18,7 +18,7 @@ #include -#include +#include #include #include @@ -29,8 +29,8 @@ namespace silkworm::sentry { class MessageSender { public: - explicit MessageSender(boost::asio::io_context& io_context) - : send_message_channel_(io_context) {} + explicit MessageSender(boost::asio::any_io_executor executor) + : send_message_channel_(std::move(executor)) {} concurrency::Channel& send_message_channel() { return send_message_channel_; diff --git a/silkworm/sentry/peer_manager.cpp b/silkworm/sentry/peer_manager.cpp index 7082802024..5e0d86bca4 100644 --- a/silkworm/sentry/peer_manager.cpp +++ b/silkworm/sentry/peer_manager.cpp @@ -234,8 +234,7 @@ Task PeerManager::connect_peer(EnodeUrl peer_url, bool is_static_peer, std auto _ = gsl::finally([this, peer_url] { this->connecting_peer_urls_.erase(peer_url); }); try { - auto& client_context = context_pool_.next_io_context(); - auto peer1 = co_await concurrency::co_spawn_sw(client_context, client->connect(peer_url, is_static_peer), use_awaitable); + auto peer1 = co_await concurrency::co_spawn_sw(executor_pool_.any_executor(), client->connect(peer_url, is_static_peer), use_awaitable); auto peer = std::shared_ptr(std::move(peer1)); co_await client_peer_channel_.send(peer); } catch (const boost::system::system_error& ex) { diff --git a/silkworm/sentry/peer_manager.hpp b/silkworm/sentry/peer_manager.hpp index def450e57e..19f619995b 100644 --- a/silkworm/sentry/peer_manager.hpp +++ b/silkworm/sentry/peer_manager.hpp @@ -26,13 +26,13 @@ #include -#include +#include #include #include #include +#include #include -#include #include #include #include @@ -47,17 +47,17 @@ struct PeerManagerObserver; class PeerManager { public: PeerManager( - boost::asio::io_context& io_context, + boost::asio::any_io_executor executor, size_t max_peers, - silkworm::rpc::ServerContextPool& context_pool) + concurrency::ExecutorPool& executor_pool) : max_peers_(max_peers), - strand_(boost::asio::make_strand(io_context)), + strand_(boost::asio::make_strand(executor)), peer_tasks_(strand_, max_peers), drop_peer_tasks_(strand_, PeerManager::kMaxSimultaneousDropPeerTasks), - context_pool_(context_pool), - need_peers_notifier_(io_context), + executor_pool_(executor_pool), + need_peers_notifier_(executor), connect_peer_tasks_(strand_, max_peers), - client_peer_channel_(io_context) {} + client_peer_channel_(executor) {} Task run( rlpx::Server& server, @@ -103,13 +103,13 @@ class PeerManager { std::list> peers_; std::list> handshaking_peers_; size_t max_peers_; - boost::asio::strand strand_; + boost::asio::strand strand_; concurrency::TaskGroup peer_tasks_; concurrency::TaskGroup drop_peer_tasks_; size_t drop_peer_tasks_count_{0}; std::set connecting_peer_urls_; - silkworm::rpc::ServerContextPool& context_pool_; + concurrency::ExecutorPool& executor_pool_; concurrency::EventNotifier need_peers_notifier_; concurrency::TaskGroup connect_peer_tasks_; concurrency::Channel> client_peer_channel_; diff --git a/silkworm/sentry/peer_manager_api.hpp b/silkworm/sentry/peer_manager_api.hpp index 58c1a64c69..83441d0a56 100644 --- a/silkworm/sentry/peer_manager_api.hpp +++ b/silkworm/sentry/peer_manager_api.hpp @@ -22,7 +22,7 @@ #include -#include +#include #include #include @@ -44,17 +44,17 @@ namespace silkworm::sentry { class PeerManagerApi : public PeerManagerObserver { public: explicit PeerManagerApi( - boost::asio::io_context& io_context, + boost::asio::any_io_executor executor, PeerManager& peer_manager) : peer_manager_(peer_manager), - peer_count_calls_channel_(io_context), - peers_calls_channel_(io_context), - peer_calls_channel_(io_context), - peer_penalize_calls_channel_(io_context), - peer_events_calls_channel_(io_context), - strand_(boost::asio::make_strand(io_context)), + peer_count_calls_channel_(executor), + peers_calls_channel_(executor), + peer_calls_channel_(executor), + peer_penalize_calls_channel_(executor), + peer_events_calls_channel_(executor), + strand_(boost::asio::make_strand(executor)), events_unsubscription_tasks_(strand_, 1000), - peer_events_channel_(io_context, 1000) {} + peer_events_channel_(executor, 1000) {} static Task run(std::shared_ptr self); @@ -109,7 +109,7 @@ class PeerManagerApi : public PeerManagerObserver { }; std::list events_subscriptions_; - boost::asio::strand strand_; + boost::asio::strand strand_; concurrency::TaskGroup events_unsubscription_tasks_; Channel peer_events_channel_; }; diff --git a/silkworm/sentry/rlpx/peer.cpp b/silkworm/sentry/rlpx/peer.cpp index bcfdd08f8b..28a7fe4313 100644 --- a/silkworm/sentry/rlpx/peer.cpp +++ b/silkworm/sentry/rlpx/peer.cpp @@ -40,7 +40,7 @@ using namespace std::chrono_literals; using namespace boost::asio; Peer::Peer( - any_io_executor&& executor, + any_io_executor executor, SocketStream stream, EccKeyPair node_key, std::string client_id, diff --git a/silkworm/sentry/rlpx/peer.hpp b/silkworm/sentry/rlpx/peer.hpp index 0c36266dcb..0b56d76277 100644 --- a/silkworm/sentry/rlpx/peer.hpp +++ b/silkworm/sentry/rlpx/peer.hpp @@ -23,7 +23,6 @@ #include #include -#include #include #include @@ -47,7 +46,7 @@ namespace silkworm::sentry::rlpx { class Peer { public: Peer( - boost::asio::any_io_executor&& executor, + boost::asio::any_io_executor executor, SocketStream stream, EccKeyPair node_key, std::string client_id, @@ -58,52 +57,6 @@ class Peer { bool is_inbound, bool is_static); - Peer( - boost::asio::any_io_executor& executor, - SocketStream stream, - EccKeyPair node_key, - std::string client_id, - uint16_t node_listen_port, - std::unique_ptr protocol, - std::optional url, - std::optional peer_public_key, - bool is_inbound, - bool is_static) - : Peer( - boost::asio::any_io_executor{executor}, - std::move(stream), - std::move(node_key), - std::move(client_id), - node_listen_port, - std::move(protocol), - std::move(url), - std::move(peer_public_key), - is_inbound, - is_static) {} - - Peer( - boost::asio::io_context& io_context, - SocketStream stream, - EccKeyPair node_key, - std::string client_id, - uint16_t node_listen_port, - std::unique_ptr protocol, - std::optional url, - std::optional peer_public_key, - bool is_inbound, - bool is_static) - : Peer( - boost::asio::any_io_executor{io_context.get_executor()}, - std::move(stream), - std::move(node_key), - std::move(client_id), - node_listen_port, - std::move(protocol), - std::move(url), - std::move(peer_public_key), - is_inbound, - is_static) {} - ~Peer(); static Task run(std::shared_ptr peer); diff --git a/silkworm/sentry/rlpx/server.cpp b/silkworm/sentry/rlpx/server.cpp index f6b34aadb7..d56c939d7d 100644 --- a/silkworm/sentry/rlpx/server.cpp +++ b/silkworm/sentry/rlpx/server.cpp @@ -29,18 +29,18 @@ namespace silkworm::sentry::rlpx { using namespace boost::asio; Server::Server( - io_context& io_context, + any_io_executor executor, uint16_t port) : ip_(ip::address{ip::address_v4::any()}), port_(port), - peer_channel_(io_context) {} + peer_channel_(std::move(executor)) {} ip::tcp::endpoint Server::listen_endpoint() const { return ip::tcp::endpoint{ip_, port_}; } Task Server::run( - silkworm::rpc::ServerContextPool& context_pool, + concurrency::ExecutorPool& executor_pool, EccKeyPair node_key, std::string client_id, std::function()> protocol_factory) { @@ -66,15 +66,15 @@ Task Server::run( log::Info("sentry") << "rlpx::Server is listening at " << node_url.to_string(); while (acceptor.is_open()) { - auto& client_context = context_pool.next_io_context(); - SocketStream stream{client_context}; + auto client_executor = executor_pool.any_executor(); + SocketStream stream{client_executor}; co_await acceptor.async_accept(stream.socket(), use_awaitable); auto remote_endpoint = stream.socket().remote_endpoint(); log::Debug("sentry") << "rlpx::Server client connected from " << remote_endpoint; auto peer = std::make_shared( - client_context, + client_executor, std::move(stream), node_key, client_id, diff --git a/silkworm/sentry/rlpx/server.hpp b/silkworm/sentry/rlpx/server.hpp index ece3f8037c..0b6343b8f7 100644 --- a/silkworm/sentry/rlpx/server.hpp +++ b/silkworm/sentry/rlpx/server.hpp @@ -22,12 +22,11 @@ #include -#include #include #include #include -#include +#include #include #include "peer.hpp" @@ -38,11 +37,11 @@ namespace silkworm::sentry::rlpx { class Server final { public: Server( - boost::asio::io_context& io_context, + boost::asio::any_io_executor executor, uint16_t port); Task run( - silkworm::rpc::ServerContextPool& context_pool, + concurrency::ExecutorPool& executor_pool, EccKeyPair node_key, std::string client_id, std::function()> protocol_factory); diff --git a/silkworm/sentry/sentry.cpp b/silkworm/sentry/sentry.cpp index 2133576003..2c6135a5a5 100644 --- a/silkworm/sentry/sentry.cpp +++ b/silkworm/sentry/sentry.cpp @@ -55,7 +55,7 @@ using namespace boost; class SentryImpl final { public: - explicit SentryImpl(Settings settings, silkworm::rpc::ServerContextPool& context_pool); + explicit SentryImpl(Settings settings, concurrency::ExecutorPool& executor_pool); SentryImpl(const SentryImpl&) = delete; SentryImpl& operator=(const SentryImpl&) = delete; @@ -90,7 +90,7 @@ class SentryImpl final { Settings settings_; std::optional node_key_; std::optional public_ip_; - silkworm::rpc::ServerContextPool& context_pool_; + concurrency::ExecutorPool& executor_pool_; StatusManager status_manager_; @@ -136,24 +136,24 @@ static api::router::ServiceRouter make_service_router( }; } -SentryImpl::SentryImpl(Settings settings, silkworm::rpc::ServerContextPool& context_pool) +SentryImpl::SentryImpl(Settings settings, concurrency::ExecutorPool& executor_pool) : settings_(std::move(settings)), - context_pool_(context_pool), - status_manager_(context_pool_.next_io_context()), - rlpx_server_(context_pool_.next_io_context(), settings_.port), + executor_pool_(executor_pool), + status_manager_(executor_pool.any_executor()), + rlpx_server_(executor_pool.any_executor(), settings_.port), discovery_( - [this] { return boost::asio::any_io_executor(context_pool_.next_io_context().get_executor()); }, + executor_pool, settings_.static_peers, !settings_.no_discover, settings_.data_dir_path, node_key_provider(), node_url_provider(), settings_.port), - peer_manager_(context_pool_.next_io_context(), settings_.max_peers, context_pool_), - message_sender_(context_pool_.next_io_context()), - message_receiver_(std::make_shared(context_pool_.next_io_context(), settings_.max_peers)), - peer_manager_api_(std::make_shared(context_pool_.next_io_context(), peer_manager_)), - peer_discovery_feedback_(std::make_shared(boost::asio::any_io_executor(context_pool_.next_io_context().get_executor()), settings_.max_peers)), + peer_manager_(executor_pool.any_executor(), settings_.max_peers, executor_pool_), + message_sender_(executor_pool.any_executor()), + message_receiver_(std::make_shared(executor_pool.any_executor(), settings_.max_peers)), + peer_manager_api_(std::make_shared(executor_pool.any_executor(), peer_manager_)), + peer_discovery_feedback_(std::make_shared(executor_pool.any_executor(), settings_.max_peers)), service_router_(make_service_router(status_manager_.status_channel(), message_sender_, *message_receiver_, *peer_manager_api_, node_info_provider())), direct_service_(std::make_shared(service_router_)), grpc_server_(make_server_config(settings_), service_router_) { @@ -207,7 +207,7 @@ Task SentryImpl::run_status_manager() { } Task SentryImpl::run_server() { - return rlpx_server_.run(context_pool_, node_key_.value(), client_id(), protocol_factory()); + return rlpx_server_.run(executor_pool_, node_key_.value(), client_id(), protocol_factory()); } std::unique_ptr SentryImpl::make_client() { @@ -301,8 +301,8 @@ std::function SentryImpl::node_url_provider() const { return [this] { return this->make_node_url(); }; } -Sentry::Sentry(Settings settings, silkworm::rpc::ServerContextPool& context_pool) - : p_impl_(std::make_unique(std::move(settings), context_pool)) { +Sentry::Sentry(Settings settings, concurrency::ExecutorPool& executor_pool) + : p_impl_(std::make_unique(std::move(settings), executor_pool)) { } Sentry::~Sentry() { diff --git a/silkworm/sentry/sentry.hpp b/silkworm/sentry/sentry.hpp index 48f479159f..b60f824b07 100644 --- a/silkworm/sentry/sentry.hpp +++ b/silkworm/sentry/sentry.hpp @@ -16,11 +16,12 @@ #pragma once +#include #include #include -#include +#include #include "api/common/sentry_client.hpp" #include "settings.hpp" @@ -31,7 +32,7 @@ class SentryImpl; class Sentry final : public api::SentryClient { public: - explicit Sentry(Settings settings, silkworm::rpc::ServerContextPool& context_pool); + explicit Sentry(Settings settings, concurrency::ExecutorPool& executor_pool); ~Sentry() override; Sentry(const Sentry&) = delete; diff --git a/silkworm/sentry/status_manager.hpp b/silkworm/sentry/status_manager.hpp index bb9d62c07b..bcb3b10484 100644 --- a/silkworm/sentry/status_manager.hpp +++ b/silkworm/sentry/status_manager.hpp @@ -18,7 +18,7 @@ #include -#include +#include #include #include @@ -29,8 +29,8 @@ namespace silkworm::sentry { class StatusManager { public: - StatusManager(boost::asio::io_context& io_context) - : status_channel_(io_context), + StatusManager(boost::asio::any_io_executor executor) + : status_channel_(std::move(executor)), status_(eth::StatusData{}) {} Task wait_for_status(); diff --git a/silkworm/sync/sentry_client.cpp b/silkworm/sync/sentry_client.cpp index c96688cded..91e88c3fe1 100644 --- a/silkworm/sync/sentry_client.cpp +++ b/silkworm/sync/sentry_client.cpp @@ -39,11 +39,11 @@ namespace silkworm { using namespace boost::asio; SentryClient::SentryClient( - boost::asio::io_context& io_context, + boost::asio::any_io_executor executor, std::shared_ptr sentry_client) - : io_context_{io_context}, + : executor_{executor}, sentry_client_{std::move(sentry_client)}, - tasks_{io_context, 1000} { + tasks_{executor, 1000} { } static std::unique_ptr decode_inbound_message(const silkworm::sentry::api::MessageFromPeer& message_from_peer) { @@ -151,9 +151,9 @@ awaitable resolve_promise_with_awaitable_result(std::promise& promis } template -static T sync_spawn(concurrency::TaskGroup& tasks, io_context& io_context, awaitable task) { +static T sync_spawn(concurrency::TaskGroup& tasks, any_io_executor executor, awaitable task) { std::promise promise; - tasks.spawn(io_context, resolve_promise_with_awaitable_result(promise, std::move(task))); + tasks.spawn(std::move(executor), resolve_promise_with_awaitable_result(promise, std::move(task))); return promise.get_future().get(); } @@ -181,7 +181,7 @@ awaitable SentryClient::send_message_by_id_async(const Ou } SentryClient::PeerIds SentryClient::send_message_by_id(const OutboundMessage& outbound_message, const PeerId& peer_id) { - return sync_spawn(tasks_, io_context_, send_message_by_id_async(outbound_message, peer_id)); + return sync_spawn(tasks_, executor_, send_message_by_id_async(outbound_message, peer_id)); } awaitable SentryClient::send_message_to_random_peers_async(const OutboundMessage& outbound_message, size_t max_peers) { @@ -192,7 +192,7 @@ awaitable SentryClient::send_message_to_random_peers_asyn } SentryClient::PeerIds SentryClient::send_message_to_random_peers(const OutboundMessage& outbound_message, size_t max_peers) { - return sync_spawn(tasks_, io_context_, send_message_to_random_peers_async(outbound_message, max_peers)); + return sync_spawn(tasks_, executor_, send_message_to_random_peers_async(outbound_message, max_peers)); } awaitable SentryClient::send_message_to_all_async(const OutboundMessage& outbound_message) { @@ -203,7 +203,7 @@ awaitable SentryClient::send_message_to_all_async(const O } SentryClient::PeerIds SentryClient::send_message_to_all(const OutboundMessage& outbound_message) { - return sync_spawn(tasks_, io_context_, send_message_to_all_async(outbound_message)); + return sync_spawn(tasks_, executor_, send_message_to_all_async(outbound_message)); } awaitable SentryClient::send_message_by_min_block_async(const OutboundMessage& outbound_message, BlockNum /*min_block*/, size_t max_peers) { @@ -214,7 +214,7 @@ awaitable SentryClient::send_message_by_min_block_async(c } SentryClient::PeerIds SentryClient::send_message_by_min_block(const OutboundMessage& outbound_message, BlockNum min_block, size_t max_peers) { - return sync_spawn(tasks_, io_context_, send_message_by_min_block_async(outbound_message, min_block, max_peers)); + return sync_spawn(tasks_, executor_, send_message_by_min_block_async(outbound_message, min_block, max_peers)); } awaitable SentryClient::peer_min_block_async(const PeerId& peer_id, BlockNum /*min_block*/) { @@ -224,7 +224,7 @@ awaitable SentryClient::peer_min_block_async(const PeerId& peer_id, BlockN } void SentryClient::peer_min_block(const PeerId& peer_id, BlockNum min_block) { - sync_spawn(tasks_, io_context_, peer_min_block_async(peer_id, min_block)); + sync_spawn(tasks_, executor_, peer_min_block_async(peer_id, min_block)); } boost::asio::awaitable SentryClient::async_run() { @@ -300,7 +300,7 @@ awaitable SentryClient::count_active_peers_async() { } uint64_t SentryClient::count_active_peers() { - return sync_spawn(tasks_, io_context_, count_active_peers_async()); + return sync_spawn(tasks_, executor_, count_active_peers_async()); } boost::asio::awaitable SentryClient::request_peer_info_async(PeerId peer_id) { @@ -311,7 +311,7 @@ boost::asio::awaitable SentryClient::request_peer_info_async(PeerId } std::string SentryClient::request_peer_info(PeerId peer_id) { - return sync_spawn(tasks_, io_context_, this->request_peer_info_async(std::move(peer_id))); + return sync_spawn(tasks_, executor_, this->request_peer_info_async(std::move(peer_id))); } boost::asio::awaitable SentryClient::penalize_peer_async(PeerId peer_id, Penalty penalty) { @@ -324,7 +324,7 @@ boost::asio::awaitable SentryClient::penalize_peer_async(PeerId peer_id, P } void SentryClient::penalize_peer(PeerId peer_id, Penalty penalty) { - sync_spawn(tasks_, io_context_, this->penalize_peer_async(std::move(peer_id), penalty)); + sync_spawn(tasks_, executor_, this->penalize_peer_async(std::move(peer_id), penalty)); } uint64_t SentryClient::active_peers() { diff --git a/silkworm/sync/sentry_client.hpp b/silkworm/sync/sentry_client.hpp index df2c7242ec..b1d5973157 100644 --- a/silkworm/sync/sentry_client.hpp +++ b/silkworm/sync/sentry_client.hpp @@ -24,8 +24,8 @@ #include +#include #include -#include #include #include @@ -44,7 +44,7 @@ namespace silkworm { class SentryClient { public: explicit SentryClient( - boost::asio::io_context& io_context, + boost::asio::any_io_executor executor, std::shared_ptr sentry_client); SentryClient(const SentryClient&) = delete; @@ -109,7 +109,7 @@ class SentryClient { // notifying registered subscribers boost::asio::awaitable publish(const silkworm::sentry::api::MessageFromPeer& message_from_peer); - boost::asio::io_context& io_context_; + boost::asio::any_io_executor executor_; std::shared_ptr sentry_client_; concurrency::TaskGroup tasks_; diff --git a/silkworm/sync/sync.cpp b/silkworm/sync/sync.cpp index 0ca9d83422..226369f3b7 100644 --- a/silkworm/sync/sync.cpp +++ b/silkworm/sync/sync.cpp @@ -24,13 +24,13 @@ namespace silkworm::chainsync { -Sync::Sync(boost::asio::io_context& io_context, +Sync::Sync(boost::asio::any_io_executor executor, mdbx::env_managed& chaindata_env, execution::Client& execution, const std::shared_ptr& sentry_client, const ChainConfig& config, const EngineRpcSettings& rpc_settings) - : sync_sentry_client_{io_context, sentry_client}, + : sync_sentry_client_{std::move(executor), sentry_client}, block_exchange_{sync_sentry_client_, db::ROAccess{chaindata_env}, config} { // If terminal total difficulty is present in chain config, the network will use Proof-of-Stake sooner or later if (config.terminal_total_difficulty) { diff --git a/silkworm/sync/sync.hpp b/silkworm/sync/sync.hpp index f097d9cc41..0d20b3486d 100644 --- a/silkworm/sync/sync.hpp +++ b/silkworm/sync/sync.hpp @@ -19,8 +19,8 @@ #include #include +#include #include -#include #include #include @@ -46,7 +46,7 @@ struct EngineRpcSettings { class Sync { public: - Sync(boost::asio::io_context& io_context, + Sync(boost::asio::any_io_executor executor, mdbx::env_managed& chaindata_env, execution::Client& execution, const std::shared_ptr& sentry_client,