Skip to content

Commit

Permalink
infra, node, sentry, sync: thread name in log traces for async_thread (
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Aug 8, 2023
1 parent 2f4f081 commit 4ba6d0a
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 28 deletions.
6 changes: 3 additions & 3 deletions cmd/dev/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ int main(int argc, char* argv[]) {
auto stop = [&state_changes_timer]() {
state_changes_timer.cancel();
};
co_await concurrency::async_thread(std::move(run), std::move(stop));
co_await concurrency::async_thread(std::move(run), std::move(stop), "state-c-sim");
};
tasks = state_changes_simulator() && server.async_run();
tasks = state_changes_simulator() && server.async_run("bekv-server");
} else {
tasks = server.async_run();
tasks = server.async_run("bekv-server");
}

ShutdownSignal shutdown_signal{context_pool.next_io_context()};
Expand Down
14 changes: 5 additions & 9 deletions silkworm/infra/concurrency/active_component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,17 @@

namespace silkworm {

/*
* Abstract interface for active components
* i.e. component that have an infinite loop and need a dedicated thread to run the loop (if the application
* has also other things to do).
* Here we prefer not to provide a thread facility and let the user provide one more suitable to the context,
* so perhaps a better name is LongRunningComponent.
*/
//! Abstract interface for active components i.e. components that have an infinite loop and need a dedicated thread
//! to run the loop (if the application has also other things to do).
class ActiveComponent : public Stoppable {
public:
virtual void execution_loop() = 0;

boost::asio::awaitable<void> async_run(std::optional<std::size_t> stack_size = {}) {
//! This adapter method makes ActiveComponent suitable to be used as asynchronous task
boost::asio::awaitable<void> async_run(const char* thread_name, std::optional<std::size_t> stack_size = {}) {
auto run = [this] { this->execution_loop(); };
auto stop = [this] { this->stop(); };
co_await concurrency::async_thread(std::move(run), std::move(stop), stack_size);
co_await concurrency::async_thread(std::move(run), std::move(stop), thread_name, stack_size);
}
};

Expand Down
9 changes: 7 additions & 2 deletions silkworm/infra/concurrency/async_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

namespace silkworm::concurrency {

boost::asio::awaitable<void> async_thread(std::function<void()> run, std::function<void()> stop, std::optional<std::size_t> stack_size) {
boost::asio::awaitable<void> async_thread(std::function<void()> run, std::function<void()> stop,
const char* name, std::optional<std::size_t> stack_size) {
std::exception_ptr run_exception;

auto executor = co_await boost::asio::this_coro::executor;
Expand All @@ -39,9 +40,13 @@ boost::asio::awaitable<void> async_thread(std::function<void()> run, std::functi
if (stack_size) {
attributes.set_stack_size(*stack_size);
}
boost::thread thread{attributes, [run = std::move(run), &run_exception, &thread_finished_notifier] {

boost::thread thread{attributes, [run = std::move(run), name = name, &run_exception, &thread_finished_notifier] {
log::set_thread_name(name);
try {
log::Info() << "Async thread [" << name << "] run started";
run();
log::Info() << "Async thread [" << name << "] run completed";
} catch (...) {
run_exception = std::current_exception();
}
Expand Down
3 changes: 2 additions & 1 deletion silkworm/infra/concurrency/async_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ namespace silkworm::concurrency {
*
* @param run thread procedure
* @param stop a callback to signal the thread procedure to exit
* @param name the name appearing in log traces for the created thread
* @param stack_size optional custom stack size for the created thread
* @return an awaitable that is pending until the thread finishes
*/
boost::asio::awaitable<void> async_thread(std::function<void()> run, std::function<void()> stop,
std::optional<std::size_t> stack_size = {});
const char* name, std::optional<std::size_t> stack_size = {});

} // namespace silkworm::concurrency
2 changes: 1 addition & 1 deletion silkworm/infra/grpc/client/reconnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ boost::asio::awaitable<void> reconnect_channel(grpc::Channel& channel) {
is_stopped = true;
};

co_await concurrency::async_thread(std::move(run), std::move(stop));
co_await concurrency::async_thread(std::move(run), std::move(stop), "channel-rec");
}

} // namespace silkworm::rpc
4 changes: 2 additions & 2 deletions silkworm/infra/grpc/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ class Server {
SILK_TRACE << "Server::shutdown " << this << " END";
}

boost::asio::awaitable<void> async_run() {
boost::asio::awaitable<void> async_run(const char* thread_name) {
auto run = [this] {
this->build_and_start();
this->join();
};
auto stop = [this] { this->shutdown(); };
co_await concurrency::async_thread(std::move(run), std::move(stop));
co_await concurrency::async_thread(std::move(run), std::move(stop), thread_name);
}

//! Returns the number of server contexts.
Expand Down
10 changes: 5 additions & 5 deletions silkworm/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Task<void> NodeImpl::run_tasks() {

Task<void> NodeImpl::start_execution_server() {
// Thread running block execution requires custom stack size because of deep EVM call stacks
return execution_server_.async_run(/*stack_size=*/kExecutionThreadStackSize);
return execution_server_.async_run("exec-engine", /*stack_size=*/kExecutionThreadStackSize);
}

Task<void> NodeImpl::start_backend_kv_grpc_server() {
Expand All @@ -144,7 +144,7 @@ Task<void> NodeImpl::start_backend_kv_grpc_server() {
auto stop = [this]() {
backend_kv_rpc_server_->shutdown();
};
co_await concurrency::async_thread(std::move(run), std::move(stop));
co_await concurrency::async_thread(std::move(run), std::move(stop), "bekv-server");
}

Task<void> NodeImpl::start_bittorrent_client() {
Expand All @@ -155,7 +155,7 @@ Task<void> NodeImpl::start_bittorrent_client() {
auto stop = [this]() {
bittorrent_client_->stop();
};
co_await concurrency::async_thread(std::move(run), std::move(stop));
co_await concurrency::async_thread(std::move(run), std::move(stop), "bit-torrent");
}
}

Expand All @@ -169,13 +169,13 @@ Task<void> NodeImpl::start_execution_log_timer() {
auto asio_guard = std::make_unique<asio_guard_type>(settings_.asio_context.get_executor());

auto run = [this] {
log::set_thread_name("asio_ctx_timer");
log::set_thread_name("ctx-log-tmr");
log::Trace("Asio Timers", {"state", "started"});
settings_.asio_context.run();
log::Trace("Asio Timers", {"state", "stopped"});
};
auto stop = [&asio_guard] { asio_guard.reset(); };
co_await silkworm::concurrency::async_thread(std::move(run), std::move(stop));
co_await silkworm::concurrency::async_thread(std::move(run), std::move(stop), "ctx-log-tmr");
}

Node::Node(Settings& settings, SentryClientPtr sentry_client, mdbx::env& chaindata_db)
Expand Down
2 changes: 1 addition & 1 deletion silkworm/sentry/grpc/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Server::~Server() {
}

Task<void> Server::async_run() {
return p_impl_->async_run();
return p_impl_->async_run("sentry-gsrv");
}

} // namespace silkworm::sentry::grpc::server
1 change: 0 additions & 1 deletion silkworm/sync/block_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ void BlockExchange::receive_message(std::shared_ptr<InboundMessage> message) {
void BlockExchange::execution_loop() {
using namespace std::chrono;
using namespace std::chrono_literals;
log::set_thread_name("block-exchg");

auto announcement_receiving_callback = [this](std::shared_ptr<InboundMessage> msg) {
statistics_.nonsolic_msgs++;
Expand Down
6 changes: 4 additions & 2 deletions silkworm/sync/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ boost::asio::awaitable<void> Sync::start_sync_sentry_client() {
}

boost::asio::awaitable<void> Sync::start_block_exchange() {
return block_exchange_.async_run();
return block_exchange_.async_run("block-exchg");
}

boost::asio::awaitable<void> Sync::start_chain_sync() {
Expand All @@ -113,7 +113,9 @@ boost::asio::awaitable<void> Sync::start_engine_rpc_server() {
auto engine_rpc_server_stop = [this]() {
engine_rpc_server_->stop();
};
co_await concurrency::async_thread(std::move(engine_rpc_server_run), std::move(engine_rpc_server_stop));
co_await concurrency::async_thread(std::move(engine_rpc_server_run),
std::move(engine_rpc_server_stop),
"eng-api-srv");
}
}

Expand Down
2 changes: 1 addition & 1 deletion silkworm/sync/sync_pow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace silkworm::chainsync {
PoWSync::PoWSync(BlockExchange& be, execution::Client& ee) : ChainSync(be, ee) {}

asio::awaitable<void> PoWSync::async_run() {
return ActiveComponent::async_run();
return ActiveComponent::async_run("pow-sync-ex");
}

auto PoWSync::resume() -> NewHeight { // find the point (head) where we left off
Expand Down

0 comments on commit 4ba6d0a

Please sign in to comment.