Skip to content

Commit

Permalink
sentry: use any_io_executor instead of io_context (#1423)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored Aug 10, 2023
1 parent 37ac4fa commit 520f59a
Show file tree
Hide file tree
Showing 52 changed files with 238 additions and 298 deletions.
5 changes: 5 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ IncludeCategories:
- Regex: '<silkworm/infra/concurrency/task\.hpp>'
Priority: 30

# coroutine.hpp has to go before the boost headers,
# otherwise boost::asio fails to compile on GCC 12
- Regex: '"task\.hpp"'
Priority: 30

# Silkworm headers
- Regex: '<silkworm.*'
Priority: 50
Expand Down
6 changes: 3 additions & 3 deletions cmd/common/shutdown_signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@

#include <silkworm/infra/concurrency/coroutine.hpp>

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>

namespace silkworm::cmd::common {

class ShutdownSignal {
public:
explicit ShutdownSignal(boost::asio::io_context& io_context)
: signals_(io_context, SIGINT, SIGTERM) {}
explicit ShutdownSignal(boost::asio::any_io_executor executor)
: signals_(executor, SIGINT, SIGTERM) {}

using SignalNumber = int;

Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ int main(int argc, char* argv[]) {
tasks = server.async_run("bekv-server");
}

ShutdownSignal shutdown_signal{context_pool.next_io_context()};
ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()};

// Go!
auto run_future = boost::asio::co_spawn(
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ void download(const BitTorrentSettings& settings) {
BitTorrentClient client{settings};

boost::asio::io_context scheduler;
ShutdownSignal shutdown_signal{scheduler};
ShutdownSignal shutdown_signal{scheduler.get_executor()};
shutdown_signal.on_signal([&](ShutdownSignal::SignalNumber /*num*/) {
client.stop();
SILK_DEBUG << "Torrent client stopped";
Expand Down
4 changes: 2 additions & 2 deletions cmd/sentry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ void sentry_main(Settings settings) {
[] { return std::make_unique<DummyServerCompletionQueue>(); },
};

ShutdownSignal shutdown_signal{context_pool.next_io_context()};
ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()};

Sentry sentry{std::move(settings), context_pool};
Sentry sentry{std::move(settings), context_pool.as_executor_pool()};

auto run_future = boost::asio::co_spawn(
context_pool.next_io_context(),
Expand Down
15 changes: 8 additions & 7 deletions cmd/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <silkworm/buildinfo.h>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/common/os.hpp>
#include <silkworm/infra/common/stopwatch.hpp>
#include <silkworm/infra/concurrency/awaitable_wait_for_all.hpp>
#include <silkworm/infra/concurrency/awaitable_wait_for_one.hpp>
#include <silkworm/infra/grpc/server/server_context_pool.hpp>
Expand Down Expand Up @@ -66,7 +65,6 @@ using silkworm::lookup_known_chain;
using silkworm::NodeSettings;
using silkworm::parse_size;
using silkworm::PreverifiedHashes;
using silkworm::StopWatch;
using silkworm::cmd::common::add_context_pool_options;
using silkworm::cmd::common::add_logging_options;
using silkworm::cmd::common::add_node_options;
Expand Down Expand Up @@ -219,8 +217,11 @@ using SentryClientPtr = std::shared_ptr<sentry::api::SentryClient>;
using SentryServerPtr = std::shared_ptr<sentry::Sentry>;
using SentryPtrPair = std::tuple<SentryClientPtr, SentryServerPtr>;

static SentryPtrPair make_sentry(sentry::Settings sentry_settings, NodeSettings& node_settings,
rpc::ServerContextPool& context_pool, db::ROAccess db_access) {
static SentryPtrPair make_sentry(
sentry::Settings sentry_settings,
NodeSettings& node_settings,
rpc::ServerContextPool& context_pool,
db::ROAccess db_access) {
SentryServerPtr sentry_server;
SentryClientPtr sentry_client;

Expand All @@ -232,7 +233,7 @@ static SentryPtrPair make_sentry(sentry::Settings sentry_settings, NodeSettings&
sentry_settings.api_address = "";

// Create embedded server
sentry_server = std::make_shared<sentry::Sentry>(std::move(sentry_settings), context_pool);
sentry_server = std::make_shared<sentry::Sentry>(std::move(sentry_settings), context_pool.as_executor_pool());

// Wrap direct client i.e. server in a session client
sentry_client = std::make_shared<sentry::SessionSentryClient>(
Expand Down Expand Up @@ -348,7 +349,7 @@ int main(int argc, char* argv[]) {
.jwt_secret_file = settings.rpcdaemon_settings.jwt_secret_file,
};
chainsync::Sync chain_sync_process{
context_pool.next_io_context(),
context_pool.next_io_context().get_executor(),
chaindata_db,
execution_client,
sentry_client,
Expand All @@ -365,7 +366,7 @@ int main(int argc, char* argv[]) {
chain_sync_process.async_run();

// Trap OS signals
ShutdownSignal shutdown_signal{context_pool.next_io_context()};
ShutdownSignal shutdown_signal{context_pool.next_io_context().get_executor()};

// Go!
auto run_future = boost::asio::co_spawn(
Expand Down
2 changes: 1 addition & 1 deletion cmd/test/backend_kv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ int main(int argc, char* argv[]) {
}};

boost::asio::io_context scheduler;
silkworm::cmd::common::ShutdownSignal shutdown_signal{scheduler};
silkworm::cmd::common::ShutdownSignal shutdown_signal{scheduler.get_executor()};
shutdown_signal.on_signal([&](silkworm::cmd::common::ShutdownSignal::SignalNumber /*num*/) {
pump_stop = true;
completion_stop = true;
Expand Down
15 changes: 10 additions & 5 deletions silkworm/infra/common/asio_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
// otherwise <boost/asio/detail/socket_types.hpp> dependency doesn't compile
#define _DARWIN_C_SOURCE
#endif
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>

#include <silkworm/core/common/assert.hpp>
#include <silkworm/infra/concurrency/signal_handler.hpp>
Expand All @@ -38,13 +38,18 @@ using namespace std::chrono_literals;
//! \brief Implementation of an asynchronous timer relying on boost:asio
class Timer {
public:
//! \param asio_context [in] : boost's asio context
//! \param executor [in] : executor
//! \param interval [in] : length of wait interval (in milliseconds)
//! \param call_back [in] : the call back function to be called
//! \param auto_start [in] : whether to start the timer immediately
explicit Timer(boost::asio::io_context& asio_context, uint32_t interval, std::function<bool()> call_back,
bool auto_start = false)
: interval_(interval), timer_(asio_context), call_back_(std::move(call_back)) {
explicit Timer(
boost::asio::any_io_executor executor,
uint32_t interval,
std::function<bool()> call_back,
bool auto_start = false)
: interval_(interval),
timer_(std::move(executor)),
call_back_(std::move(call_back)) {
SILKWORM_ASSERT(interval > 0);
if (auto_start) {
start();
Expand Down
6 changes: 3 additions & 3 deletions silkworm/infra/concurrency/awaitable_condition_variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ namespace silkworm::concurrency {

class AwaitableConditionVariableImpl {
public:
std::function<boost::asio::awaitable<void>()> waiter() {
std::function<Task<void>()> waiter() {
size_t waiter_version;
{
std::scoped_lock lock(mutex_);
waiter_version = version_;
}

return [this, waiter_version]() -> boost::asio::awaitable<void> {
return [this, waiter_version]() -> Task<void> {
auto executor = co_await boost::asio::this_coro::executor;

decltype(waiters_)::iterator waiter;
Expand Down Expand Up @@ -81,7 +81,7 @@ AwaitableConditionVariable::~AwaitableConditionVariable() {
[[maybe_unused]] int non_trivial_destructor; // silent clang-tidy
}

std::function<boost::asio::awaitable<void>()> AwaitableConditionVariable::waiter() {
std::function<Task<void>()> AwaitableConditionVariable::waiter() {
return p_impl_->waiter();
}

Expand Down
8 changes: 2 additions & 6 deletions silkworm/infra/concurrency/awaitable_condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
#include <functional>
#include <memory>

#include <silkworm/infra/concurrency/coroutine.hpp>

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/io_context.hpp>
#include "task.hpp"

namespace silkworm::concurrency {

Expand Down Expand Up @@ -57,7 +53,7 @@ class AwaitableConditionVariable {
AwaitableConditionVariable();
virtual ~AwaitableConditionVariable();

using Waiter = std::function<boost::asio::awaitable<void>()>;
using Waiter = std::function<Task<void>()>;

Waiter waiter();
void notify_all();
Expand Down
20 changes: 7 additions & 13 deletions silkworm/infra/concurrency/awaitable_future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@
#include <optional>
#include <stdexcept>

#include <silkworm/infra/concurrency/coroutine.hpp>
#include "task.hpp"

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>

namespace silkworm::concurrency {

namespace asio = boost::asio;

// An awaitable-friendly future/promise
// See also: https://docs.rs/tokio/1.25.0/tokio/sync/oneshot/index.html

Expand All @@ -47,9 +43,9 @@ class AwaitableFuture {
AwaitableFuture(AwaitableFuture&&) noexcept = default;
AwaitableFuture& operator=(AwaitableFuture&&) noexcept = default;

asio::awaitable<T> get_async() {
Task<T> get_async() {
try {
std::optional<T> result = co_await channel_->async_receive(asio::use_awaitable);
std::optional<T> result = co_await channel_->async_receive(boost::asio::use_awaitable);
co_return std::move(result.value());
} catch (const boost::system::system_error& ex) {
close_and_throw_if_cancelled(ex);
Expand All @@ -59,7 +55,7 @@ class AwaitableFuture {

T get() {
try {
std::optional<T> result = channel_->async_receive(asio::use_future).get();
std::optional<T> result = channel_->async_receive(boost::asio::use_future).get();
return std::move(result.value());
} catch (const boost::system::system_error& ex) {
close_and_throw_if_cancelled(ex);
Expand All @@ -70,7 +66,7 @@ class AwaitableFuture {
private:
friend class AwaitablePromise<T>;

using AsyncChannel = asio::experimental::concurrent_channel<void(std::exception_ptr, std::optional<T>)>;
using AsyncChannel = boost::asio::experimental::concurrent_channel<void(std::exception_ptr, std::optional<T>)>;

explicit AwaitableFuture(std::shared_ptr<AsyncChannel> channel) : channel_(channel) {}

Expand All @@ -88,13 +84,11 @@ class AwaitableFuture {

template <typename T>
class AwaitablePromise {
constexpr static size_t one_shot_channel = 1;
using AsyncChannel = typename AwaitableFuture<T>::AsyncChannel;

public:
explicit AwaitablePromise(asio::any_io_executor&& executor) : channel_(std::make_shared<AsyncChannel>(executor, one_shot_channel)) {}
explicit AwaitablePromise(asio::any_io_executor& executor) : channel_(std::make_shared<AsyncChannel>(executor, one_shot_channel)) {}
explicit AwaitablePromise(asio::io_context& io_context) : channel_(std::make_shared<AsyncChannel>(io_context, one_shot_channel)) {}
explicit AwaitablePromise(boost::asio::any_io_executor executor)
: channel_(std::make_shared<AsyncChannel>(std::move(executor), 1)) {}

AwaitablePromise(const AwaitablePromise&) = delete;
AwaitablePromise& operator=(const AwaitablePromise&) = delete;
Expand Down
Loading

0 comments on commit 520f59a

Please sign in to comment.