Skip to content

Commit

Permalink
infra, rpcdaemon: extend usage of custom co_spawn (#1429)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Aug 8, 2023
1 parent 05fe351 commit c79f6fe
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 26 deletions.
26 changes: 13 additions & 13 deletions silkworm/infra/concurrency/awaitable_wait_for_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>

#include "co_spawn_sw.hpp"
#include "parallel_group_utils.hpp"

namespace silkworm::concurrency::awaitable_wait_for_all {
Expand Down Expand Up @@ -71,8 +71,8 @@ awaitable<void, Executor> operator&&(

auto [order, ex0, ex1] =
co_await make_parallel_group(
co_spawn(ex, std::move(t), deferred),
co_spawn(ex, std::move(u), deferred))
co_spawn_sw(ex, std::move(t), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand All @@ -93,8 +93,8 @@ awaitable<U, Executor> operator&&(

auto [order, ex0, ex1, r1] =
co_await make_parallel_group(
co_spawn(ex, std::move(t), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_spawn_sw(ex, std::move(t), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand All @@ -115,8 +115,8 @@ awaitable<T, Executor> operator&&(

auto [order, ex0, r0, ex1] =
co_await make_parallel_group(
co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, std::move(u), deferred))
co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand All @@ -137,8 +137,8 @@ awaitable<std::tuple<T, U>, Executor> operator&&(

auto [order, ex0, r0, ex1, r1] =
co_await make_parallel_group(
co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand All @@ -161,8 +161,8 @@ awaitable<std::tuple<T..., std::monostate>, Executor> operator&&(

auto [order, ex0, r0, ex1, r1] =
co_await make_parallel_group(
co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, std::move(u), deferred))
co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand All @@ -183,8 +183,8 @@ awaitable<std::tuple<T..., U>, Executor> operator&&(

auto [order, ex0, r0, ex1, r1] =
co_await make_parallel_group(
co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(
wait_for_one_error(),
use_awaitable_t<Executor>{});
Expand Down
26 changes: 14 additions & 12 deletions silkworm/infra/concurrency/awaitable_wait_for_one.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>

#include "co_spawn_sw.hpp"

namespace silkworm::concurrency::awaitable_wait_for_one {

using boost::asio::experimental::wait_for_one;
Expand Down Expand Up @@ -69,7 +70,8 @@ awaitable<std::variant<std::monostate, std::monostate>, Executor> operator||(awa
auto ex = co_await this_coro::executor;

auto [order, ex0, ex1] =
co_await make_parallel_group(co_spawn(ex, std::move(t), deferred), co_spawn(ex, std::move(u), deferred))
co_await make_parallel_group(co_spawn_sw(ex, std::move(t), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

if (order[0] == 0) {
Expand All @@ -91,8 +93,8 @@ awaitable<std::variant<std::monostate, U>, Executor> operator||(awaitable<void,
auto ex = co_await this_coro::executor;

auto [order, ex0, ex1, r1] =
co_await make_parallel_group(co_spawn(ex, std::move(t), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_await make_parallel_group(co_spawn_sw(ex, std::move(t), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

if (order[0] == 0) {
Expand All @@ -116,8 +118,8 @@ awaitable<std::variant<T, std::monostate>, Executor> operator||(awaitable<T, Exe
auto ex = co_await this_coro::executor;

auto [order, ex0, r0, ex1] =
co_await make_parallel_group(co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, std::move(u), deferred))
co_await make_parallel_group(co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

if (order[0] == 0) {
Expand All @@ -141,8 +143,8 @@ awaitable<std::variant<T, U>, Executor> operator||(awaitable<T, Executor> t, awa
auto ex = co_await this_coro::executor;

auto [order, ex0, r0, ex1, r1] =
co_await make_parallel_group(co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_await make_parallel_group(co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

if (order[0] == 0) {
Expand All @@ -165,8 +167,8 @@ awaitable<std::variant<T..., std::monostate>, Executor> operator||(awaitable<std
auto ex = co_await this_coro::executor;

auto [order, ex0, r0, ex1] =
co_await make_parallel_group(co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, std::move(u), deferred))
co_await make_parallel_group(co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, std::move(u), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

using widen = detail::widen_variant<T..., std::monostate>;
Expand All @@ -190,8 +192,8 @@ awaitable<std::variant<T..., U>, Executor> operator||(awaitable<std::variant<T..
auto ex = co_await this_coro::executor;

auto [order, ex0, r0, ex1, r1] =
co_await make_parallel_group(co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred))
co_await make_parallel_group(co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred),
co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred))
.async_wait(wait_for_one(), use_awaitable_t<Executor>{});

using widen = detail::widen_variant<T..., U>;
Expand Down
3 changes: 2 additions & 1 deletion silkworm/silkrpc/ethdb/kv/state_changes_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <boost/system/error_code.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/concurrency/co_spawn_sw.hpp>
#include <silkworm/infra/concurrency/shared_service.hpp>
#include <silkworm/silkrpc/grpc/util.hpp>

Expand All @@ -45,7 +46,7 @@ StateChangesStream::StateChangesStream(ClientContext& context, remote::KV::StubI
retry_timer_{scheduler_} {}

std::future<void> StateChangesStream::open() {
return boost::asio::co_spawn(scheduler_, run(), boost::asio::use_future);
return concurrency::co_spawn_sw(scheduler_, run(), boost::asio::use_future);
}

void StateChangesStream::close() {
Expand Down

0 comments on commit c79f6fe

Please sign in to comment.