Skip to content

Commit

Permalink
rpcdaemon: refactor chain and state access layer (#2084)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jun 7, 2024
1 parent df21533 commit c95fc40
Show file tree
Hide file tree
Showing 97 changed files with 1,723 additions and 2,590 deletions.
9 changes: 3 additions & 6 deletions examples/get_latest_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@
#include <absl/strings/match.h>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <grpcpp/grpcpp.h>

#include <silkworm/core/common/util.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/rpc/common/constants.hpp>
#include <silkworm/rpc/core/blocks.hpp>
#include <silkworm/rpc/ethdb/kv/remote_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>

using namespace silkworm;
using namespace silkworm::rpc;
Expand All @@ -48,8 +45,7 @@ Task<std::optional<uint64_t>> latest_block(ethdb::Database& db) {

const auto db_transaction = co_await db.begin();
try {
ethdb::TransactionDatabase tx_db_reader{*db_transaction};
block_height = co_await core::get_latest_block_number(tx_db_reader);
block_height = co_await core::get_latest_block_number(*db_transaction);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what();
} catch (...) {
Expand Down Expand Up @@ -95,8 +91,9 @@ int main(int argc, char* argv[]) {
auto& context = context_pool.next_context();
auto io_context = context.io_context();

ethdb::kv::CoherentStateCache state_cache;
auto channel{::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials())};
auto database = std::make_unique<ethdb::kv::RemoteDatabase>(*context.grpc_context(), channel);
auto database = std::make_unique<ethdb::kv::RemoteDatabase>(&state_cache, *context.grpc_context(), channel);

auto context_pool_thread = std::thread([&]() { context_pool.run(); });

Expand Down
78 changes: 30 additions & 48 deletions silkworm/rpc/commands/debug_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "debug_api.hpp"

#include <algorithm>
#include <chrono>
#include <ostream>
#include <set>
#include <stdexcept>
Expand All @@ -44,8 +43,6 @@
#include <silkworm/rpc/core/rawdb/chain.hpp>
#include <silkworm/rpc/core/state_reader.hpp>
#include <silkworm/rpc/core/storage_walker.hpp>
#include <silkworm/rpc/ethdb/kv/cached_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/json/types.hpp>
#include <silkworm/rpc/types/block.hpp>
#include <silkworm/rpc/types/call.hpp>
Expand Down Expand Up @@ -126,11 +123,10 @@ Task<void> DebugRpcApi::handle_debug_get_modified_accounts_by_number(const nlohm
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto start_block_number = co_await core::get_block_number(start_block_id, tx_database);
const auto end_block_number = co_await core::get_block_number(end_block_id, tx_database);
const auto start_block_number = co_await core::get_block_number(start_block_id, *tx);
const auto end_block_number = co_await core::get_block_number(end_block_id, *tx);

const auto addresses = co_await get_modified_accounts(tx_database, start_block_number, end_block_number);
const auto addresses = co_await get_modified_accounts(*tx, start_block_number, end_block_number);
reply = make_json_content(request, addresses);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -167,11 +163,9 @@ Task<void> DebugRpcApi::handle_debug_get_modified_accounts_by_hash(const nlohman
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};

const auto start_block_number = co_await core::rawdb::read_header_number(tx_database, start_hash);
const auto end_block_number = co_await core::rawdb::read_header_number(tx_database, end_hash);
auto addresses = co_await get_modified_accounts(tx_database, start_block_number, end_block_number);
const auto start_block_number = co_await core::rawdb::read_header_number(*tx, start_hash);
const auto end_block_number = co_await core::rawdb::read_header_number(*tx, end_hash);
auto addresses = co_await get_modified_accounts(*tx, start_block_number, end_block_number);
reply = make_json_content(request, addresses);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -213,8 +207,7 @@ Task<void> DebugRpcApi::handle_debug_storage_range_at(const nlohmann::json& requ
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash);
if (!block_with_hash) {
Expand Down Expand Up @@ -292,8 +285,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash);
if (!block_with_hash) {
Expand All @@ -315,7 +307,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n

auto this_executor = co_await boost::asio::this_coro::executor;
auto result = co_await async_task(workers_.executor(), [&]() -> nlohmann::json {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto state = tx->create_state(this_executor, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

Expand Down Expand Up @@ -389,9 +381,8 @@ Task<void> DebugRpcApi::handle_debug_trace_transaction(const nlohmann::json& req
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(tx_database, backend_);
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(backend_);
co_await executor.trace_transaction(stream, *chain_storage, transaction_hash);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -436,15 +427,12 @@ Task<void> DebugRpcApi::handle_debug_trace_call(const nlohmann::json& request, j
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
ethdb::kv::CachedDatabase cached_database{block_number_or_hash, *tx, *state_cache_};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const bool is_latest_block = co_await core::is_latest_block_number(block_number_or_hash, tx_database);
const core::rawdb::DatabaseReader& db_reader =
is_latest_block ? static_cast<core::rawdb::DatabaseReader&>(cached_database) : static_cast<core::rawdb::DatabaseReader&>(tx_database);
const bool is_latest_block = co_await core::is_latest_block_number(block_number_or_hash, *tx);
tx->set_state_cache_enabled(/*cache_enabled=*/is_latest_block);

debug::DebugExecutor executor{db_reader, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_call(stream, block_number_or_hash, *chain_storage, call);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -511,9 +499,8 @@ Task<void> DebugRpcApi::handle_debug_trace_call_many(const nlohmann::json& reque
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(tx_database, backend_);
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(backend_);
co_await executor.trace_call_many(stream, *chain_storage, bundles, simulation_context);
} catch (...) {
SILK_ERROR << "unexpected exception processing request: " << request.dump();
Expand Down Expand Up @@ -558,10 +545,9 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_number(const nlohmann::json&
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_block(stream, *chain_storage, block_number);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -611,10 +597,9 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_hash(const nlohmann::json& r
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_block(stream, *chain_storage, block_hash);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand All @@ -638,8 +623,8 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_hash(const nlohmann::json& r
co_return;
}

Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase& tx_database, BlockNum start_block_number, BlockNum end_block_number) {
const auto latest_block_number = co_await core::get_block_number(core::kLatestBlockId, tx_database);
Task<std::set<evmc::address>> get_modified_accounts(ethdb::Transaction& tx, BlockNum start_block_number, BlockNum end_block_number) {
const auto latest_block_number = co_await core::get_block_number(core::kLatestBlockId, tx);

SILK_DEBUG << "latest: " << latest_block_number << " start: " << start_block_number << " end: " << end_block_number;

Expand All @@ -649,7 +634,7 @@ Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase&
msg << "start block (" << start_block_number << ") is later than the latest block (" << latest_block_number << ")";
throw std::invalid_argument(msg.str());
} else if (start_block_number <= end_block_number) {
core::rawdb::Walker walker = [&](const silkworm::Bytes& key, const silkworm::Bytes& value) {
auto walker = [&](const silkworm::Bytes& key, const silkworm::Bytes& value) {
auto block_number = static_cast<BlockNum>(std::stol(silkworm::to_hex(key), nullptr, 16));
if (block_number <= end_block_number) {
auto address = bytes_to_address(value.substr(0, kAddressLength));
Expand All @@ -663,7 +648,7 @@ Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase&
const auto key = silkworm::db::block_key(start_block_number);
SILK_TRACE << "Ready to walk starting from key: " << silkworm::to_hex(key);

co_await tx_database.walk(db::table::kAccountChangeSetName, key, 0, walker);
co_await tx.walk(db::table::kAccountChangeSetName, key, 0, walker);
}

co_return addresses;
Expand All @@ -684,9 +669,8 @@ Task<void> DebugRpcApi::handle_debug_get_raw_block(const nlohmann::json& request
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, nullptr);
const auto block_number = co_await core::get_block_number(block_id, tx_database);
const auto chain_storage = tx->create_storage(backend_);
const auto block_number = co_await core::get_block_number(block_id, *tx);
silkworm::Block block;
if (!(co_await chain_storage->read_canonical_block(block_number, block))) {
throw std::invalid_argument("block not found");
Expand Down Expand Up @@ -723,9 +707,8 @@ Task<void> DebugRpcApi::handle_debug_get_raw_header(const nlohmann::json& reques
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, nullptr);
const auto block_number = co_await core::get_block_number(block_id, tx_database);
const auto chain_storage = tx->create_storage(backend_);
const auto block_number = co_await core::get_block_number(block_id, *tx);
const auto block_hash = co_await chain_storage->read_canonical_hash(block_number);
auto header = co_await chain_storage->read_header(block_number, block_hash->bytes);
if (!header) {
Expand Down Expand Up @@ -763,8 +746,7 @@ Task<void> DebugRpcApi::handle_debug_get_raw_transaction(const nlohmann::json& r
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage{tx->create_storage(tx_database, nullptr)};
const auto chain_storage{tx->create_storage(backend_)};

Bytes rlp{};
auto success = co_await chain_storage->read_rlp_transaction(transaction_hash, rlp);
Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/commands/debug_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
#include <silkworm/core/common/block_cache.hpp>
#include <silkworm/infra/concurrency/private_service.hpp>
#include <silkworm/infra/concurrency/shared_service.hpp>
#include <silkworm/rpc/core/rawdb/accessors.hpp>
#include <silkworm/rpc/ethdb/database.hpp>
#include <silkworm/rpc/ethdb/kv/state_cache.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/ethdb/transaction.hpp>
#include <silkworm/rpc/json/stream.hpp>
#include <silkworm/rpc/json/types.hpp>

Expand Down Expand Up @@ -81,6 +80,6 @@ class DebugRpcApi {
friend class silkworm::rpc::json_rpc::RequestHandler;
};

Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase& tx_database, BlockNum start_block_number, BlockNum end_block_number);
Task<std::set<evmc::address>> get_modified_accounts(ethdb::Transaction& tx, BlockNum start_block_number, BlockNum end_block_number);

} // namespace silkworm::rpc::commands
20 changes: 10 additions & 10 deletions silkworm/rpc/commands/debug_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <silkworm/infra/test_util/log.hpp>
#include <silkworm/rpc/core/blocks.hpp>
#include <silkworm/rpc/core/filter_storage.hpp>
#include <silkworm/rpc/ethdb/base_transaction.hpp>
#include <silkworm/rpc/ethdb/kv/state_cache.hpp>
#if !defined(__clang__)
#include <silkworm/rpc/stagedsync/stages.hpp>
Expand Down Expand Up @@ -160,10 +161,10 @@ class DummyCursor : public ethdb::CursorDupSort {
nlohmann::json::iterator itr_;
};

class DummyTransaction : public ethdb::Transaction {
class DummyTransaction : public ethdb::BaseTransaction {
public:
explicit DummyTransaction(const nlohmann::json& json)
: json_{json}, tx_id_{next_tx_id++}, view_id_{next_view_id++} {};
: BaseTransaction(nullptr), json_{json}, tx_id_{next_tx_id++}, view_id_{next_view_id++} {};

[[nodiscard]] uint64_t tx_id() const override {
return tx_id_;
Expand Down Expand Up @@ -191,11 +192,11 @@ class DummyTransaction : public ethdb::Transaction {
co_return cursor;
}

std::shared_ptr<silkworm::State> create_state(boost::asio::any_io_executor&, const core::rawdb::DatabaseReader&, const ChainStorage&, BlockNum) override {
std::shared_ptr<silkworm::State> create_state(boost::asio::any_io_executor&, const ChainStorage&, BlockNum) override {
return nullptr;
}

std::shared_ptr<ChainStorage> create_storage(const core::rawdb::DatabaseReader&, ethbackend::BackEnd*) override {
std::shared_ptr<ChainStorage> create_storage(ethbackend::BackEnd*) override {
return nullptr;
}

Expand Down Expand Up @@ -326,10 +327,9 @@ TEST_CASE("get_modified_accounts") {
auto database = DummyDatabase{json};
auto begin_result = boost::asio::co_spawn(pool, database.begin(), boost::asio::use_future);
auto tx = begin_result.get();
ethdb::TransactionDatabase tx_database{*tx};

SECTION("end == start") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a010), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a010), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 1);
Expand All @@ -341,7 +341,7 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("end == start + 1") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a011), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a011), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 2);
Expand All @@ -354,7 +354,7 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("end >> start") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a058), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a058), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 70);
Expand Down Expand Up @@ -435,14 +435,14 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("start > end") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a011, 0x52a010), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a011, 0x52a010), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.empty());
}

SECTION("start > last block") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a061, 0x52a061), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a061, 0x52a061), boost::asio::use_future);
CHECK_THROWS_AS(result.get(), std::invalid_argument);
}
}
Expand Down
Loading

0 comments on commit c95fc40

Please sign in to comment.