Skip to content

Commit

Permalink
rpcdaemon: extract awaitable async task run by executor (#1953)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Apr 9, 2024
1 parent 970bd02 commit 4b14186
Show file tree
Hide file tree
Showing 12 changed files with 707 additions and 564 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rpc-performance-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: QA - RPC Performance Tests

on:
workflow_dispatch:
schedule:
- cron: '0 0 * * *' # Run every day at 00:00 AM UTC

Expand Down
79 changes: 35 additions & 44 deletions silkworm/rpc/commands/debug_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include <string>
#include <vector>

#include <boost/asio/compose.hpp>
#include <boost/asio/post.hpp>
#include <evmc/evmc.hpp>

#include <silkworm/core/common/endian.hpp>
Expand All @@ -36,6 +34,7 @@
#include <silkworm/db/util.hpp>
#include <silkworm/infra/common/ensure.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/rpc/common/async_task.hpp>
#include <silkworm/rpc/common/util.hpp>
#include <silkworm/rpc/core/account_dumper.hpp>
#include <silkworm/rpc/core/blocks.hpp>
Expand All @@ -47,7 +46,6 @@
#include <silkworm/rpc/core/storage_walker.hpp>
#include <silkworm/rpc/ethdb/kv/cached_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/json/call.hpp>
#include <silkworm/rpc/json/types.hpp>
#include <silkworm/rpc/types/block.hpp>
#include <silkworm/rpc/types/call.hpp>
Expand Down Expand Up @@ -254,10 +252,10 @@ Task<void> DebugRpcApi::handle_debug_storage_range_at(const nlohmann::json& requ
co_await storage_walker.storage_range_at(block_number, address, start_key, max_result, collector);

nlohmann::json result = {{"storage", storage}};
if (next_key.length() > 0) {
result["nextKey"] = "0x" + silkworm::to_hex(next_key);
} else {
if (next_key.empty()) {
result["nextKey"] = nlohmann::json();
} else {
result["nextKey"] = "0x" + silkworm::to_hex(next_key);
}

reply = make_json_content(request, result);
Expand Down Expand Up @@ -314,46 +312,39 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n

auto chain_config_ptr = co_await chain_storage->read_chain_config();
ensure(chain_config_ptr.has_value(), "cannot read chain config");

auto this_executor = co_await boost::asio::this_coro::executor;
auto result = co_await async_task(workers_.executor(), [&]() -> nlohmann::json {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

EVMExecutor executor{*chain_config_ptr, workers_, state};

auto result = co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), void(nlohmann::json)>(
[&](auto& self) {
boost::asio::post(workers_, [&, self = std::move(self)]() mutable {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

EVMExecutor executor{*chain_config_ptr, workers_, state};

uint64_t index = std::min(static_cast<uint64_t>(transactions.size()), tx_index);
for (uint64_t idx{0}; idx < index; idx++) {
rpc::Transaction txn{transactions[idx]};
executor.call(block, txn);
}

const auto& ibs = executor.get_ibs_state();

nlohmann::json json_result;
if (ibs.exists(address)) {
std::ostringstream oss;
oss << std::hex << ibs.get_nonce(address);
json_result["nonce"] = "0x" + oss.str();
json_result["balance"] = "0x" + intx::to_string(ibs.get_balance(address), 16);
json_result["codeHash"] = ibs.get_code_hash(address);
json_result["code"] = "0x" + silkworm::to_hex(ibs.get_code(address));
} else {
json_result["balance"] = "0x0";
json_result["code"] = "0x";
json_result["codeHash"] = "0x0000000000000000000000000000000000000000000000000000000000000000";
json_result["nonce"] = "0x0";
}

boost::asio::post(this_executor, [json_result, self = std::move(self)]() mutable {
self.complete(json_result);
});
});
},
boost::asio::use_awaitable);
uint64_t index = std::min(static_cast<uint64_t>(transactions.size()), tx_index);
for (uint64_t idx{0}; idx < index; idx++) {
rpc::Transaction txn{transactions[idx]};
executor.call(block, txn);
}

const auto& ibs = executor.get_ibs_state();

nlohmann::json json_result;
if (ibs.exists(address)) {
std::ostringstream oss;
oss << std::hex << ibs.get_nonce(address);
json_result["nonce"] = "0x" + oss.str();
json_result["balance"] = "0x" + intx::to_string(ibs.get_balance(address), 16);
json_result["codeHash"] = ibs.get_code_hash(address);
json_result["code"] = "0x" + silkworm::to_hex(ibs.get_code(address));
} else {
json_result["balance"] = "0x0";
json_result["code"] = "0x";
json_result["codeHash"] = "0x0000000000000000000000000000000000000000000000000000000000000000";
json_result["nonce"] = "0x0";
}
return json_result;
});

reply = make_json_content(request, result);
} catch (const std::invalid_argument& e) {
Expand Down
82 changes: 82 additions & 0 deletions silkworm/rpc/common/async_task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2024 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <exception>
#include <type_traits>
#include <utility>

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/compose.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>

namespace silkworm::rpc {

//! Helper trait for any completion handler signature
template <typename R, typename F, typename... Args>
struct CompletionHandler {
using type = void(std::exception_ptr, R);
};

//! Partial specialization for \code void return type
template <typename F, typename... Args>
struct CompletionHandler<void, F, Args...> {
using type = void(std::exception_ptr);
};

//! Alias helper trait for the completion handler signature of any task
template <typename F, typename... Args>
using TaskCompletionHandler = typename CompletionHandler<std::invoke_result_t<F, Args...>, F, Args...>::type;

//! Asynchronous \code co_await-able task executing function \code fn with arguments \code args in \code runner executor
template <typename Executor, typename F, typename... Args>
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward) because of https://github.com/llvm/llvm-project/issues/68105
Task<std::invoke_result_t<F, Args...>> async_task(Executor runner, F&& fn, Args&&... args) {
auto this_executor = co_await ThisTask::executor;
co_return co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), TaskCompletionHandler<F, Args...>>(
[&this_executor, &runner, fn = std::forward<F>(fn), ... args = std::forward<Args>(args)](auto& self) mutable {
boost::asio::post(runner, [&, fn = std::forward<decltype(fn)>(fn), ... args = std::forward<Args>(args), self = std::move(self)]() mutable {
try {
if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>) {
std::invoke(fn, args...);
boost::asio::post(this_executor, [self = std::move(self)]() mutable {
self.complete({});
});
} else {
auto result = std::invoke(fn, args...);
boost::asio::post(this_executor, [result = std::move(result), self = std::move(self)]() mutable {
self.complete({}, result);
});
}
} catch (...) {
std::exception_ptr eptr = std::current_exception();
boost::asio::post(this_executor, [eptr, self = std::move(self)]() mutable {
if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>)
self.complete(eptr);
else
self.complete(eptr, {});
});
}
});
},
boost::asio::use_awaitable);
}

} // namespace silkworm::rpc
92 changes: 92 additions & 0 deletions silkworm/rpc/common/async_task_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2024 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <benchmark/benchmark.h>
#include <boost/asio/thread_pool.hpp>

#include <silkworm/rpc/test/context_test_base.hpp>

#include "async_task.hpp"

namespace silkworm::rpc {

std::size_t recursive_factorial(std::size_t n) {
return n == 0 ? 1 : n * recursive_factorial(n - 1);
}

struct AsyncTaskBenchTest : test::ContextTestBase {
};

template <typename Executor>
Task<std::size_t> async_compose_factorial(const Executor runner, const std::size_t number) {
const auto this_executor = co_await ThisTask::executor;
co_return co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), void(std::exception_ptr, std::size_t)>(
[&](auto& self) {
boost::asio::post(runner, [&, self = std::move(self)]() mutable {
try {
const auto result = recursive_factorial(number);
boost::asio::post(this_executor, [result, self = std::move(self)]() mutable {
self.complete({}, result);
});
} catch (...) {
std::exception_ptr eptr = std::current_exception();
boost::asio::post(this_executor, [eptr, self = std::move(self)]() mutable {
self.complete(eptr, {});
});
}
});
},
boost::asio::use_awaitable);
}

static void benchmark_async_compose(benchmark::State& state) {
const auto n = static_cast<std::size_t>(state.range(0));

boost::asio::thread_pool workers{};
AsyncTaskBenchTest test;
for ([[maybe_unused]] auto _ : state) {
const auto result = test.spawn_and_wait(async_compose_factorial(workers.get_executor(), n));
benchmark::DoNotOptimize(result);
}
}

BENCHMARK(benchmark_async_compose)->Arg(10);
BENCHMARK(benchmark_async_compose)->Arg(100);
BENCHMARK(benchmark_async_compose)->Arg(1'000);
BENCHMARK(benchmark_async_compose)->Arg(10'000);

template <typename Executor>
Task<std::size_t> async_task_factorial(Executor runner, std::size_t number) {
co_return co_await async_task(runner, recursive_factorial, number);
}

static void benchmark_async_task(benchmark::State& state) {
const auto n = static_cast<std::size_t>(state.range(0));

boost::asio::thread_pool workers{};
AsyncTaskBenchTest test;
for ([[maybe_unused]] auto _ : state) {
const auto result = test.spawn_and_wait(async_task_factorial(workers.get_executor(), n));
benchmark::DoNotOptimize(result);
}
}

BENCHMARK(benchmark_async_task)->Arg(10);
BENCHMARK(benchmark_async_task)->Arg(100);
BENCHMARK(benchmark_async_task)->Arg(1'000);
BENCHMARK(benchmark_async_task)->Arg(10'000);

} // namespace silkworm::rpc
Loading

0 comments on commit 4b14186

Please sign in to comment.