diff --git a/cmd/dev/backend_kv_server.cpp b/cmd/dev/backend_kv_server.cpp index e13a9a2439..de9fdff131 100644 --- a/cmd/dev/backend_kv_server.cpp +++ b/cmd/dev/backend_kv_server.cpp @@ -39,7 +39,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/silkworm/node/backend/ethereum_backend.hpp b/silkworm/node/backend/ethereum_backend.hpp index 45333f3a94..cf6bdb52ff 100644 --- a/silkworm/node/backend/ethereum_backend.hpp +++ b/silkworm/node/backend/ethereum_backend.hpp @@ -23,8 +23,8 @@ #include #include -#include #include +#include #include namespace silkworm { diff --git a/silkworm/node/node.cpp b/silkworm/node/node.cpp index 8e08d79d5a..55ced6f603 100644 --- a/silkworm/node/node.cpp +++ b/silkworm/node/node.cpp @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include #include diff --git a/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.cpp b/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.cpp new file mode 100644 index 0000000000..a90176accc --- /dev/null +++ b/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.cpp @@ -0,0 +1,45 @@ +/* + Copyright 2022 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "backend_kv_server.hpp" + +#include + +#include +#include +#include + +namespace silkworm::rpc { + +BackEndKvServer::BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend) + : Server(settings), + BackEndServer(settings, backend), + KvServer(settings, backend.chaindata_env(), backend.state_change_source()) { +} + +// Register the gRPC services: they must exist for the lifetime of the server built by builder. +void BackEndKvServer::register_async_services(grpc::ServerBuilder& builder) { + BackEndServer::register_async_services(builder); + KvServer::register_async_services(builder); +} + +// Start server-side RPC requests as required by gRPC async model: one RPC per type is requested in advance. +void BackEndKvServer::register_request_calls() { + BackEndServer::register_request_calls(); + KvServer::register_request_calls(); +} + +} // namespace silkworm::rpc diff --git a/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp b/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp new file mode 100644 index 0000000000..fac9873f2a --- /dev/null +++ b/silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp @@ -0,0 +1,38 @@ +/* + Copyright 2022 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include + +namespace silkworm::rpc { + +class BackEndKvServer : public BackEndServer, public KvServer { + public: + BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend); + + BackEndKvServer(const BackEndKvServer&) = delete; + BackEndKvServer& operator=(const BackEndKvServer&) = delete; + + protected: + void register_async_services(grpc::ServerBuilder& builder) override; + void register_request_calls() override; +}; + +} // namespace silkworm::rpc diff --git a/silkworm/node/remote/kv/grpc/server/backend_kv_server.cpp b/silkworm/node/remote/ethbackend/grpc/server/backend_server.cpp similarity index 53% rename from silkworm/node/remote/kv/grpc/server/backend_kv_server.cpp rename to silkworm/node/remote/ethbackend/grpc/server/backend_server.cpp index 4c50bf55d1..9a20986ba4 100644 --- a/silkworm/node/remote/kv/grpc/server/backend_kv_server.cpp +++ b/silkworm/node/remote/ethbackend/grpc/server/backend_server.cpp @@ -14,30 +14,27 @@ limitations under the License. */ -#include "backend_kv_server.hpp" +#include "backend_server.hpp" #include #include #include -#include namespace silkworm::rpc { -BackEndKvServer::BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend) +BackEndServer::BackEndServer(const ServerSettings& settings, const EthereumBackEnd& backend) : Server(settings), backend_(backend) { setup_backend_calls(backend); - setup_kv_calls(); - SILK_INFO << "BackEndKvServer created listening on: " << settings.address_uri; + SILK_INFO << "BackEndServer created listening on: " << settings.address_uri; } // Register the gRPC services: they must exist for the lifetime of the server built by builder. -void BackEndKvServer::register_async_services(grpc::ServerBuilder& builder) { +void BackEndServer::register_async_services(grpc::ServerBuilder& builder) { builder.RegisterService(&backend_async_service_); - builder.RegisterService(&kv_async_service_); } -void BackEndKvServer::setup_backend_calls(const EthereumBackEnd& backend) { +void BackEndServer::setup_backend_calls(const EthereumBackEnd& backend) { EtherbaseCall::fill_predefined_reply(backend); NetVersionCall::fill_predefined_reply(backend); BackEndVersionCall::fill_predefined_reply(); @@ -45,7 +42,7 @@ void BackEndKvServer::setup_backend_calls(const EthereumBackEnd& backend) { ClientVersionCall::fill_predefined_reply(backend); } -void BackEndKvServer::register_backend_request_calls(agrpc::GrpcContext* grpc_context) { +void BackEndServer::register_backend_request_calls(agrpc::GrpcContext* grpc_context) { SILK_TRACE << "BackEndService::register_backend_request_calls START"; auto service = &backend_async_service_; auto& backend = backend_; @@ -86,57 +83,8 @@ void BackEndKvServer::register_backend_request_calls(agrpc::GrpcContext* grpc_co SILK_TRACE << "BackEndService::register_backend_request_calls END"; } -void BackEndKvServer::setup_kv_calls() { - KvVersionCall::fill_predefined_reply(); -} - -void BackEndKvServer::register_kv_request_calls(agrpc::GrpcContext* grpc_context) { - SILK_TRACE << "BackEndKvServer::register_kv_request_calls START"; - auto service = &kv_async_service_; - auto& backend = backend_; - - // Register one requested call repeatedly for each RPC: asio-grpc will take care of re-registration on any incoming call - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestVersion, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await KvVersionCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestTx, - [&backend, grpc_context](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await TxCall{*grpc_context, std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestStateChanges, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await StateChangesCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestSnapshots, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await SnapshotsCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainGet, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await DomainGetCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryGet, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await HistoryGetCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestIndexRange, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await IndexRangeCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryRange, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await HistoryRangeCall{std::forward(args)...}(backend); - }); - request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainRange, - [&backend](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) - co_await DomainRangeCall{std::forward(args)...}(backend); - }); - SILK_TRACE << "BackEndKvServer::register_kv_request_calls END"; -} - //! Start server-side RPC requests as required by gRPC async model: one RPC per type is requested in advance. -void BackEndKvServer::register_request_calls() { +void BackEndServer::register_request_calls() { // Start all server-side RPC requests for each available server context for (std::size_t i = 0; i < num_contexts(); i++) { const auto& context = next_context(); @@ -144,7 +92,6 @@ void BackEndKvServer::register_request_calls() { // Register initial requested calls for ETHBACKEND and KV services register_backend_request_calls(grpc_context); - register_kv_request_calls(grpc_context); } } diff --git a/silkworm/node/remote/kv/grpc/server/backend_kv_server.hpp b/silkworm/node/remote/ethbackend/grpc/server/backend_server.hpp similarity index 72% rename from silkworm/node/remote/kv/grpc/server/backend_kv_server.hpp rename to silkworm/node/remote/ethbackend/grpc/server/backend_server.hpp index f8c3d9f07a..48c55bffef 100644 --- a/silkworm/node/remote/kv/grpc/server/backend_kv_server.hpp +++ b/silkworm/node/remote/ethbackend/grpc/server/backend_server.hpp @@ -22,17 +22,16 @@ #include #include #include -#include #include namespace silkworm::rpc { -class BackEndKvServer : public Server { +class BackEndServer : public virtual Server { public: - BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend); + BackEndServer(const ServerSettings& settings, const EthereumBackEnd& backend); - BackEndKvServer(const BackEndKvServer&) = delete; - BackEndKvServer& operator=(const BackEndKvServer&) = delete; + BackEndServer(const BackEndServer&) = delete; + BackEndServer& operator=(const BackEndServer&) = delete; protected: void register_async_services(grpc::ServerBuilder& builder) override; @@ -42,17 +41,11 @@ class BackEndKvServer : public Server { static void setup_backend_calls(const EthereumBackEnd& backend); void register_backend_request_calls(agrpc::GrpcContext* grpc_context); - static void setup_kv_calls(); - void register_kv_request_calls(agrpc::GrpcContext* grpc_context); - //! The Ethereum full node service. const EthereumBackEnd& backend_; //! \warning The gRPC service must exist for the lifetime of the gRPC server it is registered on. remote::ETHBACKEND::AsyncService backend_async_service_; - - //! \warning The gRPC service must exist for the lifetime of the gRPC server it is registered on. - remote::KV::AsyncService kv_async_service_; }; } // namespace silkworm::rpc diff --git a/silkworm/node/remote/ethbackend/grpc/server/backend_server_test.cpp b/silkworm/node/remote/ethbackend/grpc/server/backend_server_test.cpp new file mode 100644 index 0000000000..3d184f5504 --- /dev/null +++ b/silkworm/node/remote/ethbackend/grpc/server/backend_server_test.cpp @@ -0,0 +1,489 @@ +/* + Copyright 2022 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "backend_server.hpp" + +#include +#include // DO NOT remove: used for std::condition_variable, CLion suggestion is buggy +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +namespace { // Trick to avoid name clashes in multiple test modules + +class BackEndClient { + public: + explicit BackEndClient(remote::ETHBACKEND::StubInterface* stub) : stub_(stub) {} + + grpc::Status etherbase(remote::EtherbaseReply* response) { + grpc::ClientContext context; + return stub_->Etherbase(&context, remote::EtherbaseRequest{}, response); + } + + grpc::Status net_version(remote::NetVersionReply* response) { + grpc::ClientContext context; + return stub_->NetVersion(&context, remote::NetVersionRequest{}, response); + } + + grpc::Status net_peer_count(remote::NetPeerCountReply* response) { + grpc::ClientContext context; + return stub_->NetPeerCount(&context, remote::NetPeerCountRequest{}, response); + } + + grpc::Status version(types::VersionReply* response) { + grpc::ClientContext context; + return stub_->Version(&context, google::protobuf::Empty{}, response); + } + + grpc::Status protocol_version(remote::ProtocolVersionReply* response) { + grpc::ClientContext context; + return stub_->ProtocolVersion(&context, remote::ProtocolVersionRequest{}, response); + } + + grpc::Status client_version(remote::ClientVersionReply* response) { + grpc::ClientContext context; + return stub_->ClientVersion(&context, remote::ClientVersionRequest{}, response); + } + + grpc::Status subscribe_and_consume(const remote::SubscribeRequest& request, + std::vector& responses) { + grpc::ClientContext context; + auto subscribe_reply_reader = stub_->Subscribe(&context, request); + bool has_more{true}; + do { + has_more = subscribe_reply_reader->Read(&responses.emplace_back()); + } while (has_more); + responses.pop_back(); + return subscribe_reply_reader->Finish(); + } + + grpc::Status node_info(const remote::NodesInfoRequest& request, remote::NodesInfoReply* response) { + grpc::ClientContext context; + return stub_->NodeInfo(&context, request, response); + } + + private: + remote::ETHBACKEND::StubInterface* stub_; +}; + +const uint64_t kTestSentryPeerCount{10}; +constexpr const char* kTestSentryNodeId{"24bfa2cdce7c6a41184fa0809ad8d76969b7280952e9aa46179d90cfbab90f7d2b004928f0364389a1aa8d5166281f2ff7568493c1f719e8f6148ef8cf8af42d"}; +constexpr const char* kTestSentryNodeClientId{"MockSentryClient"}; + +class MockSentryClient + : public std::enable_shared_from_this, + public silkworm::sentry::api::SentryClient, + public silkworm::sentry::api::Service { + template + using Task = silkworm::Task; + + Task> service() override { + co_return shared_from_this(); + } + [[nodiscard]] bool is_ready() override { return true; } + void on_disconnect(std::function()> /*callback*/) override {} + Task reconnect() override { co_return; } + + Task set_status(silkworm::sentry::eth::StatusData /*status_data*/) override { + throw std::runtime_error("not implemented"); + } + Task handshake() override { + throw std::runtime_error("not implemented"); + } + Task node_infos() override { + const std::string ip_str = "1.2.3.4"; + const uint16_t port = 50555; + const std::string node_url_str = std::string("enode://") + kTestSentryNodeId + "@" + ip_str + ":" + std::to_string(port); + + silkworm::sentry::api::NodeInfo info = { + silkworm::sentry::EnodeUrl{node_url_str}, + kTestSentryNodeClientId, + boost::asio::ip::tcp::endpoint{boost::asio::ip::make_address(ip_str), port}, + port, + }; + co_return NodeInfos{info}; + } + + Task send_message_by_id(silkworm::sentry::Message /*message*/, silkworm::sentry::EccPublicKey /*public_key*/) override { + throw std::runtime_error("not implemented"); + } + Task send_message_to_random_peers(silkworm::sentry::Message /*message*/, size_t /*max_peers*/) override { + throw std::runtime_error("not implemented"); + } + Task send_message_to_all(silkworm::sentry::Message /*message*/) override { + throw std::runtime_error("not implemented"); + } + Task send_message_by_min_block(silkworm::sentry::Message /*message*/, size_t /*max_peers*/) override { + throw std::runtime_error("not implemented"); + } + Task peer_min_block(silkworm::sentry::EccPublicKey /*public_key*/) override { + throw std::runtime_error("not implemented"); + } + Task messages( + silkworm::sentry::api::MessageIdSet /*message_id_filter*/, + std::function(silkworm::sentry::api::MessageFromPeer)> /*consumer*/) override { + throw std::runtime_error("not implemented"); + } + + Task peers() override { + throw std::runtime_error("not implemented"); + } + Task peer_count() override { + co_return kTestSentryPeerCount; + } + Task> peer_by_id(silkworm::sentry::EccPublicKey /*public_key*/) override { + throw std::runtime_error("not implemented"); + } + Task penalize_peer(silkworm::sentry::EccPublicKey /*public_key*/) override { + throw std::runtime_error("not implemented"); + } + Task peer_events(std::function(silkworm::sentry::api::PeerEvent)> /*consumer*/) override { + throw std::runtime_error("not implemented"); + } +}; + +// TODO(canepat): better copy grpc_pick_unused_port_or_die to generate unused port +const std::string kTestAddressUri{"localhost:12345"}; + +const silkworm::db::MapConfig kTestMap{"TestTable"}; +const silkworm::db::MapConfig kTestMultiMap{"TestMultiTable", mdbx::key_mode::usual, mdbx::value_mode::multi}; + +using namespace silkworm; + +using StateChangeTokenObserver = std::function)>; + +struct TestableStateChangeCollection : public StateChangeCollection { + std::optional subscribe(StateChangeConsumer consumer, StateChangeFilter filter) override { + const auto token = StateChangeCollection::subscribe(consumer, filter); + if (token_observer_) { + token_observer_(token); + } + return token; + } + + void set_token(StateChangeToken next_token) { next_token_ = next_token; } + + void register_token_observer(StateChangeTokenObserver token_observer) { token_observer_ = std::move(token_observer); } + + StateChangeTokenObserver token_observer_; +}; + +class TestableEthereumBackEnd : public EthereumBackEnd { + public: + TestableEthereumBackEnd(const NodeSettings& node_settings, mdbx::env* chaindata_env) + : EthereumBackEnd{ + node_settings, + chaindata_env, + std::make_shared(), + std::make_unique(), + } {} + + [[nodiscard]] TestableStateChangeCollection* state_change_source_for_test() const noexcept { + return dynamic_cast(EthereumBackEnd::state_change_source()); + } +}; + +struct BackEndE2ETest { + explicit BackEndE2ETest( + silkworm::log::Level log_verbosity = silkworm::log::Level::kNone, + NodeSettings&& options = {}) + : set_verbosity_log_guard{log_verbosity} { + std::shared_ptr channel = + grpc::CreateChannel(kTestAddressUri, grpc::InsecureChannelCredentials()); + ethbackend_stub = remote::ETHBACKEND::NewStub(channel); + backend_client = std::make_unique(ethbackend_stub.get()); + + srv_config.context_pool_settings.num_contexts = 1; + srv_config.address_uri = kTestAddressUri; + + DataDirectory data_dir{tmp_dir.path()}; + REQUIRE_NOTHROW(data_dir.deploy()); + db_config = std::make_unique(); + db_config->max_readers = options.chaindata_env_config.max_readers; + db_config->path = data_dir.chaindata().path().string(); + db_config->create = true; + db_config->in_memory = true; + database_env = db::open_env(*db_config); + auto rw_txn{database_env.start_write()}; + db::open_map(rw_txn, kTestMap); + db::open_map(rw_txn, kTestMultiMap); + rw_txn.commit(); + + backend = std::make_unique(options, &database_env); + server = std::make_unique(srv_config, *backend); + server->build_and_start(); + } + + void fill_tables() { + auto rw_txn = database_env.start_write(); + db::PooledCursor rw_cursor1{rw_txn, kTestMap}; + rw_cursor1.upsert(mdbx::slice{"AA"}, mdbx::slice{"00"}); + rw_cursor1.upsert(mdbx::slice{"BB"}, mdbx::slice{"11"}); + db::PooledCursor rw_cursor2{rw_txn, kTestMultiMap}; + rw_cursor2.upsert(mdbx::slice{"AA"}, mdbx::slice{"00"}); + rw_cursor2.upsert(mdbx::slice{"AA"}, mdbx::slice{"11"}); + rw_cursor2.upsert(mdbx::slice{"AA"}, mdbx::slice{"22"}); + rw_cursor2.upsert(mdbx::slice{"BB"}, mdbx::slice{"22"}); + rw_txn.commit(); + } + + void alter_tables() { + auto rw_txn = database_env.start_write(); + db::PooledCursor rw_cursor1{rw_txn, kTestMap}; + rw_cursor1.upsert(mdbx::slice{"CC"}, mdbx::slice{"22"}); + db::PooledCursor rw_cursor2{rw_txn, kTestMultiMap}; + rw_cursor2.upsert(mdbx::slice{"AA"}, mdbx::slice{"33"}); + rw_cursor2.upsert(mdbx::slice{"BB"}, mdbx::slice{"33"}); + rw_txn.commit(); + } + + ~BackEndE2ETest() { + server->shutdown(); + server->join(); + } + + test_util::SetLogVerbosityGuard set_verbosity_log_guard; + rpc::Grpc2SilkwormLogGuard grpc2silkworm_log_guard; + std::unique_ptr ethbackend_stub; + std::unique_ptr backend_client; + rpc::ServerSettings srv_config; + TemporaryDirectory tmp_dir; + std::unique_ptr db_config; + mdbx::env_managed database_env; + std::unique_ptr backend; + std::unique_ptr server; +}; + +} // namespace + +namespace silkworm::rpc { + +// Exclude gRPC tests from sanitizer builds due to data race warnings inside gRPC library +#ifndef SILKWORM_SANITIZE +TEST_CASE("BackEndServer", "[silkworm][node][rpc]") { + test_util::SetLogVerbosityGuard guard{log::Level::kNone}; + Grpc2SilkwormLogGuard log_guard; + ServerSettings srv_config; + srv_config.address_uri = kTestAddressUri; + TemporaryDirectory tmp_dir; + DataDirectory data_dir{tmp_dir.path()}; + REQUIRE_NOTHROW(data_dir.deploy()); + db::EnvConfig db_config{data_dir.chaindata().path().string()}; + db_config.create = true; + db_config.in_memory = true; + auto database_env = db::open_env(db_config); + NodeSettings node_settings; + TestableEthereumBackEnd backend{node_settings, &database_env}; + + SECTION("BackEndServer::BackEndServer OK: create/destroy server") { + BackEndServer server{srv_config, backend}; + } + + SECTION("BackEndServer::BackEndServer OK: create/shutdown/destroy server") { + BackEndServer server{srv_config, backend}; + server.shutdown(); + } + + SECTION("BackEndServer::build_and_start OK: run server in separate thread") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + std::thread server_thread{[&server]() { server.join(); }}; + server.shutdown(); + server_thread.join(); + } + + SECTION("BackEndServer::build_and_start OK: create/shutdown/run/destroy server") { + BackEndServer server{srv_config, backend}; + server.shutdown(); + server.build_and_start(); + } + + SECTION("BackEndServer::shutdown OK: shutdown server not running") { + BackEndServer server{srv_config, backend}; + server.shutdown(); + } + + SECTION("BackEndServer::shutdown OK: shutdown twice server not running") { + BackEndServer server{srv_config, backend}; + server.shutdown(); + server.shutdown(); + } + + SECTION("BackEndServer::shutdown OK: shutdown running server") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + server.shutdown(); + server.join(); + } + + SECTION("BackEndServer::shutdown OK: shutdown twice running server") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + server.shutdown(); + server.shutdown(); + server.join(); + } + + SECTION("BackEndServer::shutdown OK: shutdown running server again after join") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + server.shutdown(); + server.join(); + server.shutdown(); + } + + SECTION("BackEndServer::join OK: shutdown joined server") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + std::thread server_thread{[&server]() { server.join(); }}; + server.shutdown(); + server_thread.join(); + } + + SECTION("BackEndServer::join OK: shutdown joined server and join again") { + BackEndServer server{srv_config, backend}; + server.build_and_start(); + std::thread server_thread{[&server]() { server.join(); }}; + server.shutdown(); + server_thread.join(); + server.join(); // cannot move before server_thread.join() due to data race in boost::asio::detail::posix_thread + } +} + +TEST_CASE("BackEndServer E2E: empty node settings", "[silkworm][node][rpc]") { + BackEndE2ETest test; + auto backend_client = *test.backend_client; + + SECTION("Etherbase: return missing coinbase error") { + remote::EtherbaseReply response; + const auto status = backend_client.etherbase(&response); + CHECK(!status.ok()); + CHECK(status.error_code() == grpc::StatusCode::INTERNAL); + CHECK(status.error_message() == "etherbase must be explicitly specified"); + CHECK(!response.has_address()); + } + + SECTION("NetVersion: return out-of-range network ID") { + remote::NetVersionReply response; + const auto status = backend_client.net_version(&response); + CHECK(status.ok()); + CHECK(response.id() == 0); + } + + SECTION("Version: return ETHBACKEND version") { + types::VersionReply response; + const auto status = backend_client.version(&response); + CHECK(status.ok()); + CHECK(response.major() == 2); + CHECK(response.minor() == 3); + CHECK(response.patch() == 0); + } + + SECTION("ProtocolVersion: return ETH protocol version") { + remote::ProtocolVersionReply response; + const auto status = backend_client.protocol_version(&response); + CHECK(status.ok()); + CHECK(response.id() == kEthDevp2pProtocolVersion); + } + + SECTION("ClientVersion: return Silkworm client version") { + remote::ClientVersionReply response; + const auto status = backend_client.client_version(&response); + CHECK(status.ok()); + CHECK(absl::StrContains(response.node_name(), "silkworm")); + } + + // TODO(canepat): change using something meaningful when really implemented + SECTION("Subscribe: return streamed subscriptions") { + remote::SubscribeRequest request; + std::vector responses; + const auto status = backend_client.subscribe_and_consume(request, responses); + CHECK(status.ok()); + CHECK(responses.size() == 2); + } +} + +TEST_CASE("BackEndServer E2E: mainnet chain with zero etherbase", "[silkworm][node][rpc]") { + NodeSettings node_settings; + node_settings.chain_config = kMainnetConfig; + node_settings.etherbase = evmc::address{}; + BackEndE2ETest test{silkworm::log::Level::kNone, std::move(node_settings)}; + auto backend_client = *test.backend_client; + + SECTION("Etherbase: return coinbase address") { + remote::EtherbaseReply response; + const auto status = backend_client.etherbase(&response); + CHECK(status.ok()); + CHECK(response.has_address()); + CHECK(response.address() == types::H160()); + } + + SECTION("NetVersion: return network ID") { + remote::NetVersionReply response; + const auto status = backend_client.net_version(&response); + CHECK(status.ok()); + CHECK(response.id() == kMainnetConfig.chain_id); + } +} + +TEST_CASE("BackEndServer E2E: one Sentry status OK", "[silkworm][node][rpc]") { + BackEndE2ETest test; + auto backend_client = *test.backend_client; + + SECTION("NetPeerCount: return peer count") { + remote::NetPeerCountReply response; + const auto status = backend_client.net_peer_count(&response); + CHECK(status.ok()); + CHECK(response.count() == kTestSentryPeerCount); + } + + SECTION("NodeInfo: return information about nodes") { + remote::NodesInfoRequest request; + request.set_limit(0); + remote::NodesInfoReply response; + const auto status = backend_client.node_info(request, &response); + CHECK(status.ok()); + CHECK(response.nodes_info_size() == 1); + CHECK(response.nodes_info(0).id() == kTestSentryNodeId); + CHECK(response.nodes_info(0).name() == kTestSentryNodeClientId); + } +} +#endif // SILKWORM_SANITIZE + +} // namespace silkworm::rpc diff --git a/silkworm/node/remote/kv/grpc/server/kv_calls.cpp b/silkworm/node/remote/kv/grpc/server/kv_calls.cpp index 037afbb618..a314e493e5 100644 --- a/silkworm/node/remote/kv/grpc/server/kv_calls.cpp +++ b/silkworm/node/remote/kv/grpc/server/kv_calls.cpp @@ -63,7 +63,7 @@ void KvVersionCall::fill_predefined_reply() { KvVersionCall::response_.set_patch(std::get<2>(max_version)); } -Task KvVersionCall::operator()(const EthereumBackEnd& /*backend*/) { +Task KvVersionCall::operator()() { SILK_TRACE << "KvVersionCall START"; co_await agrpc::finish(responder_, response_, grpc::Status::OK); SILK_TRACE << "KvVersionCall END version: " << response_.major() << "." << response_.minor() << "." << response_.patch(); @@ -75,8 +75,7 @@ void TxCall::set_max_ttl_duration(const std::chrono::milliseconds& max_ttl_durat TxCall::max_ttl_duration_ = max_ttl_duration; } -Task TxCall::operator()(const EthereumBackEnd& backend) { - auto chaindata_env = backend.chaindata_env(); +Task TxCall::operator()(mdbx::env* chaindata_env) { SILK_TRACE << "TxCall peer: " << peer() << " MDBX readers: " << chaindata_env->get_info().mi_numreaders; grpc::Status status{grpc::Status::OK}; @@ -162,7 +161,7 @@ Task TxCall::operator()(const EthereumBackEnd& backend) { while (true) { const auto [ec] = co_await max_ttl_alarm.async_wait(as_tuple(use_awaitable)); if (!ec) { - handle_max_ttl_timer_expired(backend); + handle_max_ttl_timer_expired(chaindata_env); max_ttl_deadline += max_ttl_duration_; } } @@ -330,9 +329,7 @@ void TxCall::handle_operation(const remote::Cursor* request, db::ROCursorDupSort SILK_TRACE << "TxCall::handle_operation " << this << " op=" << remote::Op_Name(request->op()) << " END"; } -void TxCall::handle_max_ttl_timer_expired(const EthereumBackEnd& backend) { - auto chaindata_env = backend.chaindata_env(); - +void TxCall::handle_max_ttl_timer_expired(mdbx::env* chaindata_env) { // Save the whole state of the transaction (i.e. all cursor positions) std::vector positions; const bool save_success = save_cursors(positions); @@ -661,9 +658,8 @@ void TxCall::throw_with_error(grpc::Status&& status) { throw server::CallException{std::move(status)}; } -Task StateChangesCall::operator()(const EthereumBackEnd& backend) { +Task StateChangesCall::operator()(StateChangeCollection* source) { SILK_TRACE << "StateChangesCall w/ storage: " << request_.with_storage() << " w/ txs: " << request_.with_transactions() << " START"; - auto source = backend.state_change_source(); // Create a never-expiring timer whose cancellation will notify our async waiting is completed auto coroutine_executor = co_await boost::asio::this_coro::executor; @@ -724,7 +720,7 @@ Task StateChangesCall::operator()(const EthereumBackEnd& backend) { co_return; } -Task SnapshotsCall::operator()(const EthereumBackEnd& /*backend*/) { +Task SnapshotsCall::operator()() { SILK_TRACE << "SnapshotsCall START"; remote::SnapshotsReply response; // TODO(canepat) implement properly @@ -732,7 +728,7 @@ Task SnapshotsCall::operator()(const EthereumBackEnd& /*backend*/) { SILK_TRACE << "SnapshotsCall END #blocks_files: " << response.blocks_files_size() << " #history_files: " << response.history_files_size(); } -Task HistoryGetCall::operator()(const EthereumBackEnd& /*backend*/) { +Task HistoryGetCall::operator()() { SILK_TRACE << "HistoryGetCall START"; remote::HistoryGetReply response; // TODO(canepat) implement properly @@ -740,7 +736,7 @@ Task HistoryGetCall::operator()(const EthereumBackEnd& /*backend*/) { SILK_TRACE << "HistoryGetCall END ok: " << response.ok() << " value: " << response.v(); } -Task DomainGetCall::operator()(const EthereumBackEnd& /*backend*/) { +Task DomainGetCall::operator()() { SILK_TRACE << "DomainGetCall START"; remote::DomainGetReply response; // TODO(canepat) implement properly @@ -748,7 +744,7 @@ Task DomainGetCall::operator()(const EthereumBackEnd& /*backend*/) { SILK_TRACE << "DomainGetCall END ok: " << response.ok() << " value: " << response.v(); } -Task IndexRangeCall::operator()(const EthereumBackEnd& /*backend*/) { +Task IndexRangeCall::operator()() { SILK_TRACE << "IndexRangeCall START"; remote::IndexRangeReply response; // TODO(canepat) implement properly @@ -756,7 +752,7 @@ Task IndexRangeCall::operator()(const EthereumBackEnd& /*backend*/) { SILK_TRACE << "IndexRangeCall END #timestamps: " << response.timestamps_size() << " next_page_token: " << response.next_page_token(); } -Task HistoryRangeCall::operator()(const EthereumBackEnd& /*backend*/) { +Task HistoryRangeCall::operator()() { SILK_TRACE << "HistoryRangeCall START"; remote::Pairs response; // TODO(canepat) implement properly @@ -765,7 +761,7 @@ Task HistoryRangeCall::operator()(const EthereumBackEnd& /*backend*/) { << " next_page_token: " << response.next_page_token(); } -Task DomainRangeCall::operator()(const EthereumBackEnd& /*backend*/) { +Task DomainRangeCall::operator()() { SILK_TRACE << "DomainRangeCall START"; remote::Pairs response; // TODO(canepat) implement properly diff --git a/silkworm/node/remote/kv/grpc/server/kv_calls.hpp b/silkworm/node/remote/kv/grpc/server/kv_calls.hpp index 3083d76caa..7d36b578bc 100644 --- a/silkworm/node/remote/kv/grpc/server/kv_calls.hpp +++ b/silkworm/node/remote/kv/grpc/server/kv_calls.hpp @@ -33,8 +33,8 @@ #include #include #include -#include -#include + +#include "state_change_collection.hpp" // KV API protocol versions // 5.1.0 - first issue @@ -65,7 +65,7 @@ class KvVersionCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); private: static types::VersionReply response_; @@ -79,7 +79,7 @@ class TxCall : public server::BidiStreamingCall { static void set_max_ttl_duration(const std::chrono::milliseconds& max_ttl_duration); - Task operator()(const EthereumBackEnd& backend); + Task operator()(mdbx::env* chaindata_env); private: struct TxCursor { @@ -102,7 +102,7 @@ class TxCall : public server::BidiStreamingCall { void handle_operation(const remote::Cursor* request, db::ROCursorDupSort& cursor, remote::Pair& response); - void handle_max_ttl_timer_expired(const EthereumBackEnd& backend); + void handle_max_ttl_timer_expired(mdbx::env* chaindata_env); bool save_cursors(std::vector& positions); @@ -158,7 +158,7 @@ class StateChangesCall : public server::ServerStreamingCall operator()(const EthereumBackEnd& backend); + Task operator()(StateChangeCollection* source); }; //! Unary RPC for Snapshots method of 'kv' gRPC protocol. @@ -167,7 +167,7 @@ class SnapshotsCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; //! Unary RPC for HistoryGet method of 'kv' gRPC protocol. @@ -176,7 +176,7 @@ class HistoryGetCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; //! Unary RPC for DomainGet method of 'kv' gRPC protocol. @@ -185,7 +185,7 @@ class DomainGetCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; //! Unary RPC for IndexRange method of 'kv' gRPC protocol. @@ -194,7 +194,7 @@ class IndexRangeCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; //! Unary RPC for IndexRange method of 'kv' gRPC protocol. @@ -203,7 +203,7 @@ class HistoryRangeCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; //! Unary RPC for IndexRange method of 'kv' gRPC protocol. @@ -212,7 +212,7 @@ class DomainRangeCall : public server::UnaryCall operator()(const EthereumBackEnd& backend); + Task operator()(); }; } // namespace silkworm::rpc diff --git a/silkworm/node/remote/kv/grpc/server/kv_calls_test.cpp b/silkworm/node/remote/kv/grpc/server/kv_calls_test.cpp index 411b88d513..a1ae28943d 100644 --- a/silkworm/node/remote/kv/grpc/server/kv_calls_test.cpp +++ b/silkworm/node/remote/kv/grpc/server/kv_calls_test.cpp @@ -20,6 +20,8 @@ #include +#include + namespace silkworm::rpc { TEST_CASE("higher_version_ignoring_patch", "[silkworm][rpc][kv_calls]") { diff --git a/silkworm/node/remote/kv/grpc/server/kv_server.cpp b/silkworm/node/remote/kv/grpc/server/kv_server.cpp new file mode 100644 index 0000000000..890e01cf75 --- /dev/null +++ b/silkworm/node/remote/kv/grpc/server/kv_server.cpp @@ -0,0 +1,98 @@ +/* + Copyright 2022 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "kv_server.hpp" + +#include + +#include + +#include "kv_calls.hpp" + +namespace silkworm::rpc { + +KvServer::KvServer(const ServerSettings& settings, mdbx::env* chaindata_env, StateChangeCollection* state_change_source) + : Server(settings), chaindata_env_{chaindata_env}, state_change_source_{state_change_source} { + setup_kv_calls(); + SILK_INFO << "KvServer created listening on: " << settings.address_uri; +} + +// Register the gRPC services: they must exist for the lifetime of the server built by builder. +void KvServer::register_async_services(grpc::ServerBuilder& builder) { + builder.RegisterService(&kv_async_service_); +} + +void KvServer::setup_kv_calls() { + KvVersionCall::fill_predefined_reply(); +} + +void KvServer::register_kv_request_calls(agrpc::GrpcContext* grpc_context) { + SILK_TRACE << "KvServer::register_kv_request_calls START"; + auto service = &kv_async_service_; + + // Register one requested call repeatedly for each RPC: asio-grpc will take care of re-registration on any incoming call + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestVersion, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await KvVersionCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestTx, + [this, grpc_context](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await TxCall{*grpc_context, std::forward(args)...}(chaindata_env_); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestStateChanges, + [this](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await StateChangesCall{std::forward(args)...}(state_change_source_); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestSnapshots, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await SnapshotsCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainGet, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await DomainGetCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryGet, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await HistoryGetCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestIndexRange, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await IndexRangeCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryRange, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await HistoryRangeCall{std::forward(args)...}(); + }); + request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainRange, + [](auto&&... args) -> Task { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines) + co_await DomainRangeCall{std::forward(args)...}(); + }); + SILK_TRACE << "KvServer::register_kv_request_calls END"; +} + +//! Start server-side RPC requests as required by gRPC async model: one RPC per type is requested in advance. +void KvServer::register_request_calls() { + // Start all server-side RPC requests for each available server context + for (std::size_t i = 0; i < num_contexts(); i++) { + const auto& context = next_context(); + auto grpc_context = context.server_grpc_context(); + + // Register initial requested calls for ETHBACKEND and KV services + register_kv_request_calls(grpc_context); + } +} + +} // namespace silkworm::rpc diff --git a/silkworm/node/remote/kv/grpc/server/kv_server.hpp b/silkworm/node/remote/kv/grpc/server/kv_server.hpp new file mode 100644 index 0000000000..e0da595992 --- /dev/null +++ b/silkworm/node/remote/kv/grpc/server/kv_server.hpp @@ -0,0 +1,56 @@ +/* + Copyright 2022 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "state_change_collection.hpp" + +namespace silkworm::rpc { + +class KvServer : public virtual Server { + public: + KvServer(const ServerSettings& settings, mdbx::env* chaindata_env, StateChangeCollection* state_change_source); + + KvServer(const KvServer&) = delete; + KvServer& operator=(const KvServer&) = delete; + + protected: + void register_async_services(grpc::ServerBuilder& builder) override; + void register_request_calls() override; + + private: + static void setup_kv_calls(); + void register_kv_request_calls(agrpc::GrpcContext* grpc_context); + + //! \warning The gRPC service must exist for the lifetime of the gRPC server it is registered on. + remote::KV::AsyncService kv_async_service_; + + //! The chain database environment + mdbx::env* chaindata_env_; + + //! The collector of state changes acting as source of state change notifications + StateChangeCollection* state_change_source_; +}; + +} // namespace silkworm::rpc diff --git a/silkworm/node/remote/kv/grpc/server/backend_kv_server_test.cpp b/silkworm/node/remote/kv/grpc/server/kv_server_test.cpp similarity index 84% rename from silkworm/node/remote/kv/grpc/server/backend_kv_server_test.cpp rename to silkworm/node/remote/kv/grpc/server/kv_server_test.cpp index 77b05f846e..4bcfd43ae4 100644 --- a/silkworm/node/remote/kv/grpc/server/backend_kv_server_test.cpp +++ b/silkworm/node/remote/kv/grpc/server/kv_server_test.cpp @@ -14,99 +14,37 @@ limitations under the License. */ -#include "backend_kv_server.hpp" +#include "kv_server.hpp" #include -#include // DO NOT remove: used for std::condition_variable, CLion suggestion is buggy -#include // DO NOT remove: used for std::function, CLion suggestion is buggy +#include +#include #include #include #include #include #include -#include - #include #include #include -#include #include #include #include #include #include -#include #include #include #include #include -#include -#include -#include #include -#include +#include using namespace std::chrono_literals; namespace { // Trick suggested by gRPC team to avoid name clashes in multiple test modules -class BackEndClient { - public: - explicit BackEndClient(remote::ETHBACKEND::StubInterface* stub) : stub_(stub) {} - - grpc::Status etherbase(remote::EtherbaseReply* response) { - grpc::ClientContext context; - return stub_->Etherbase(&context, remote::EtherbaseRequest{}, response); - } - - grpc::Status net_version(remote::NetVersionReply* response) { - grpc::ClientContext context; - return stub_->NetVersion(&context, remote::NetVersionRequest{}, response); - } - - grpc::Status net_peer_count(remote::NetPeerCountReply* response) { - grpc::ClientContext context; - return stub_->NetPeerCount(&context, remote::NetPeerCountRequest{}, response); - } - - grpc::Status version(types::VersionReply* response) { - grpc::ClientContext context; - return stub_->Version(&context, google::protobuf::Empty{}, response); - } - - grpc::Status protocol_version(remote::ProtocolVersionReply* response) { - grpc::ClientContext context; - return stub_->ProtocolVersion(&context, remote::ProtocolVersionRequest{}, response); - } - - grpc::Status client_version(remote::ClientVersionReply* response) { - grpc::ClientContext context; - return stub_->ClientVersion(&context, remote::ClientVersionRequest{}, response); - } - - grpc::Status subscribe_and_consume(const remote::SubscribeRequest& request, - std::vector& responses) { - grpc::ClientContext context; - auto subscribe_reply_reader = stub_->Subscribe(&context, request); - bool has_more{true}; - do { - has_more = subscribe_reply_reader->Read(&responses.emplace_back()); - } while (has_more); - responses.pop_back(); - return subscribe_reply_reader->Finish(); - } - - grpc::Status node_info(const remote::NodesInfoRequest& request, remote::NodesInfoReply* response) { - grpc::ClientContext context; - return stub_->NodeInfo(&context, request, response); - } - - private: - remote::ETHBACKEND::StubInterface* stub_; -}; - using TxStreamPtr = std::unique_ptr>; class KvClient { @@ -237,82 +175,6 @@ class ThreadedKvClient { std::vector responses_; }; -const uint64_t kTestSentryPeerCount{10}; -constexpr const char* kTestSentryNodeId{"24bfa2cdce7c6a41184fa0809ad8d76969b7280952e9aa46179d90cfbab90f7d2b004928f0364389a1aa8d5166281f2ff7568493c1f719e8f6148ef8cf8af42d"}; -constexpr const char* kTestSentryNodeClientId{"MockSentryClient"}; - -class MockSentryClient - : public std::enable_shared_from_this, - public silkworm::sentry::api::SentryClient, - public silkworm::sentry::api::Service { - template - using Task = silkworm::Task; - - Task> service() override { - co_return shared_from_this(); - } - [[nodiscard]] bool is_ready() override { return true; } - void on_disconnect(std::function()> /*callback*/) override {} - Task reconnect() override { co_return; } - - Task set_status(silkworm::sentry::eth::StatusData /*status_data*/) override { - throw std::runtime_error("not implemented"); - } - Task handshake() override { - throw std::runtime_error("not implemented"); - } - Task node_infos() override { - const std::string ip_str = "1.2.3.4"; - const uint16_t port = 50555; - const std::string node_url_str = std::string("enode://") + kTestSentryNodeId + "@" + ip_str + ":" + std::to_string(port); - - silkworm::sentry::api::NodeInfo info = { - silkworm::sentry::EnodeUrl{node_url_str}, - kTestSentryNodeClientId, - boost::asio::ip::tcp::endpoint{boost::asio::ip::make_address(ip_str), port}, - port, - }; - co_return NodeInfos{info}; - } - - Task send_message_by_id(silkworm::sentry::Message /*message*/, silkworm::sentry::EccPublicKey /*public_key*/) override { - throw std::runtime_error("not implemented"); - } - Task send_message_to_random_peers(silkworm::sentry::Message /*message*/, size_t /*max_peers*/) override { - throw std::runtime_error("not implemented"); - } - Task send_message_to_all(silkworm::sentry::Message /*message*/) override { - throw std::runtime_error("not implemented"); - } - Task send_message_by_min_block(silkworm::sentry::Message /*message*/, size_t /*max_peers*/) override { - throw std::runtime_error("not implemented"); - } - Task peer_min_block(silkworm::sentry::EccPublicKey /*public_key*/) override { - throw std::runtime_error("not implemented"); - } - Task messages( - silkworm::sentry::api::MessageIdSet /*message_id_filter*/, - std::function(silkworm::sentry::api::MessageFromPeer)> /*consumer*/) override { - throw std::runtime_error("not implemented"); - } - - Task peers() override { - throw std::runtime_error("not implemented"); - } - Task peer_count() override { - co_return kTestSentryPeerCount; - } - Task> peer_by_id(silkworm::sentry::EccPublicKey /*public_key*/) override { - throw std::runtime_error("not implemented"); - } - Task penalize_peer(silkworm::sentry::EccPublicKey /*public_key*/) override { - throw std::runtime_error("not implemented"); - } - Task peer_events(std::function(silkworm::sentry::api::PeerEvent)> /*consumer*/) override { - throw std::runtime_error("not implemented"); - } -}; - // TODO(canepat): better copy grpc_pick_unused_port_or_die to generate unused port const std::string kTestAddressUri{"localhost:12345"}; @@ -339,30 +201,11 @@ struct TestableStateChangeCollection : public StateChangeCollection { StateChangeTokenObserver token_observer_; }; -class TestableEthereumBackEnd : public EthereumBackEnd { - public: - TestableEthereumBackEnd(const NodeSettings& node_settings, mdbx::env* chaindata_env) - : EthereumBackEnd{ - node_settings, - chaindata_env, - std::make_shared(), - std::make_unique(), - } {} - - [[nodiscard]] TestableStateChangeCollection* state_change_source_for_test() const noexcept { - return dynamic_cast(EthereumBackEnd::state_change_source()); - } -}; - -struct BackEndKvE2eTest { - explicit BackEndKvE2eTest( - silkworm::log::Level log_verbosity = silkworm::log::Level::kNone, - NodeSettings&& options = {}) +struct KvEnd2EndTest { + explicit KvEnd2EndTest(silkworm::log::Level log_verbosity = silkworm::log::Level::kNone) : set_verbosity_log_guard{log_verbosity} { std::shared_ptr channel = grpc::CreateChannel(kTestAddressUri, grpc::InsecureChannelCredentials()); - ethbackend_stub = remote::ETHBACKEND::NewStub(channel); - backend_client = std::make_unique(ethbackend_stub.get()); kv_stub = remote::KV::NewStub(channel); kv_client = std::make_unique(kv_stub.get()); @@ -372,7 +215,6 @@ struct BackEndKvE2eTest { DataDirectory data_dir{tmp_dir.path()}; REQUIRE_NOTHROW(data_dir.deploy()); db_config = std::make_unique(); - db_config->max_readers = options.chaindata_env_config.max_readers; db_config->path = data_dir.chaindata().path().string(); db_config->create = true; db_config->in_memory = true; @@ -382,8 +224,8 @@ struct BackEndKvE2eTest { db::open_map(rw_txn, kTestMultiMap); rw_txn.commit(); - backend = std::make_unique(options, &database_env); - server = std::make_unique(srv_config, *backend); + state_change_collection = std::make_unique(); + server = std::make_unique(srv_config, &database_env, state_change_collection.get()); server->build_and_start(); } @@ -410,23 +252,21 @@ struct BackEndKvE2eTest { rw_txn.commit(); } - ~BackEndKvE2eTest() { + ~KvEnd2EndTest() { server->shutdown(); server->join(); } test_util::SetLogVerbosityGuard set_verbosity_log_guard; rpc::Grpc2SilkwormLogGuard grpc2silkworm_log_guard; - std::unique_ptr ethbackend_stub; - std::unique_ptr backend_client; std::unique_ptr kv_stub; std::unique_ptr kv_client; rpc::ServerSettings srv_config; TemporaryDirectory tmp_dir; std::unique_ptr db_config; mdbx::env_managed database_env; - std::unique_ptr backend; - std::unique_ptr server; + std::unique_ptr state_change_collection; + std::unique_ptr server; }; } // namespace @@ -435,7 +275,7 @@ namespace silkworm::rpc { // Exclude gRPC tests from sanitizer builds due to data race warnings inside gRPC library #ifndef SILKWORM_SANITIZE -TEST_CASE("BackEndKvServer", "[silkworm][node][rpc]") { +TEST_CASE("KvServer", "[silkworm][node][rpc]") { test_util::SetLogVerbosityGuard guard{log::Level::kNone}; Grpc2SilkwormLogGuard log_guard; ServerSettings srv_config; @@ -447,76 +287,75 @@ TEST_CASE("BackEndKvServer", "[silkworm][node][rpc]") { db_config.create = true; db_config.in_memory = true; auto database_env = db::open_env(db_config); - NodeSettings node_settings; - TestableEthereumBackEnd backend{node_settings, &database_env}; + auto state_change_source{std::make_unique()}; - SECTION("BackEndKvServer::BackEndKvServer OK: create/destroy server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::KvServer OK: create/destroy server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; } - SECTION("BackEndKvServer::BackEndKvServer OK: create/shutdown/destroy server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::KvServer OK: create/shutdown/destroy server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.shutdown(); } - SECTION("BackEndKvServer::build_and_start OK: run server in separate thread") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::build_and_start OK: run server in separate thread") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); std::thread server_thread{[&server]() { server.join(); }}; server.shutdown(); server_thread.join(); } - SECTION("BackEndKvServer::build_and_start OK: create/shutdown/run/destroy server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::build_and_start OK: create/shutdown/run/destroy server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.shutdown(); server.build_and_start(); } - SECTION("BackEndKvServer::shutdown OK: shutdown server not running") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::shutdown OK: shutdown server not running") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.shutdown(); } - SECTION("BackEndKvServer::shutdown OK: shutdown twice server not running") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::shutdown OK: shutdown twice server not running") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.shutdown(); server.shutdown(); } - SECTION("BackEndKvServer::shutdown OK: shutdown running server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::shutdown OK: shutdown running server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); server.shutdown(); server.join(); } - SECTION("BackEndKvServer::shutdown OK: shutdown twice running server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::shutdown OK: shutdown twice running server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); server.shutdown(); server.shutdown(); server.join(); } - SECTION("BackEndKvServer::shutdown OK: shutdown running server again after join") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::shutdown OK: shutdown running server again after join") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); server.shutdown(); server.join(); server.shutdown(); } - SECTION("BackEndKvServer::join OK: shutdown joined server") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::join OK: shutdown joined server") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); std::thread server_thread{[&server]() { server.join(); }}; server.shutdown(); server_thread.join(); } - SECTION("BackEndKvServer::join OK: shutdown joined server and join again") { - BackEndKvServer server{srv_config, backend}; + SECTION("KvServer::join OK: shutdown joined server and join again") { + KvServer server{srv_config, &database_env, state_change_source.get()}; server.build_and_start(); std::thread server_thread{[&server]() { server.join(); }}; server.shutdown(); @@ -525,66 +364,10 @@ TEST_CASE("BackEndKvServer", "[silkworm][node][rpc]") { } } -TEST_CASE("BackEndKvServer E2E: empty node settings", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; - auto backend_client = *test.backend_client; - - SECTION("Etherbase: return missing coinbase error") { - remote::EtherbaseReply response; - const auto status = backend_client.etherbase(&response); - CHECK(!status.ok()); - CHECK(status.error_code() == grpc::StatusCode::INTERNAL); - CHECK(status.error_message() == "etherbase must be explicitly specified"); - CHECK(!response.has_address()); - } - - SECTION("NetVersion: return out-of-range network ID") { - remote::NetVersionReply response; - const auto status = backend_client.net_version(&response); - CHECK(status.ok()); - CHECK(response.id() == 0); - } - - SECTION("Version: return ETHBACKEND version") { - types::VersionReply response; - const auto status = backend_client.version(&response); - CHECK(status.ok()); - CHECK(response.major() == 2); - CHECK(response.minor() == 3); - CHECK(response.patch() == 0); - } - - SECTION("ProtocolVersion: return ETH protocol version") { - remote::ProtocolVersionReply response; - const auto status = backend_client.protocol_version(&response); - CHECK(status.ok()); - CHECK(response.id() == kEthDevp2pProtocolVersion); - } - - SECTION("ClientVersion: return Silkworm client version") { - remote::ClientVersionReply response; - const auto status = backend_client.client_version(&response); - CHECK(status.ok()); - CHECK(absl::StrContains(response.node_name(), "silkworm")); - } - - // TODO(canepat): change using something meaningful when really implemented - SECTION("Subscribe: return streamed subscriptions") { - remote::SubscribeRequest request; - std::vector responses; - const auto status = backend_client.subscribe_and_consume(request, responses); - CHECK(status.ok()); - CHECK(responses.size() == 2); - } -} - -TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; - auto kv_client = *test.kv_client; - +TEST_CASE_METHOD(KvEnd2EndTest, "KvServer E2E: KV", "[silkworm][node][rpc]") { SECTION("Version: return KV version") { types::VersionReply response; - const auto status = kv_client.version(&response); + const auto status = kv_client->version(&response); CHECK(status.ok()); CHECK(response.major() == 5); CHECK(response.minor() == 1); @@ -596,7 +379,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { open.set_op(remote::Op::OPEN); std::vector requests{open}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(!status.ok()); CHECK(status.error_code() == grpc::StatusCode::INVALID_ARGUMENT); CHECK(absl::StrContains(status.error_message(), "unknown bucket")); @@ -610,7 +393,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { open.set_bucket_name("NonexistentTable"); std::vector requests{open}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(!status.ok()); CHECK(status.error_code() == grpc::StatusCode::INVALID_ARGUMENT); CHECK(absl::StrContains(status.error_message(), "unknown bucket")); @@ -623,7 +406,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { open.set_bucket_name(kTestMap.name); std::vector requests{open}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(!status.ok()); CHECK(status.error_code() == grpc::StatusCode::INVALID_ARGUMENT); CHECK(absl::StrContains(status.error_message(), "unknown cursor")); @@ -634,7 +417,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { SECTION("Tx OK: just start then finish") { std::vector requests{}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 1); CHECK(responses[0].tx_id() != 0); @@ -646,7 +429,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { open.set_bucket_name(kTestMap.name); std::vector requests{open}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 2); CHECK(responses[0].tx_id() != 0); @@ -659,7 +442,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { open_dup_sort.set_bucket_name(kTestMultiMap.name); std::vector requests{open_dup_sort}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 2); CHECK(responses[0].tx_id() != 0); @@ -675,7 +458,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { close.set_cursor(0); std::vector requests{open, close}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 3); CHECK(responses[0].tx_id() != 0); @@ -692,7 +475,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { close.set_cursor(0); std::vector requests{open, close}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 3); CHECK(responses[0].tx_id() != 0); @@ -709,7 +492,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { close.set_cursor(12345); std::vector requests{open, close}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(!status.ok()); CHECK(status.error_code() == grpc::StatusCode::INVALID_ARGUMENT); CHECK(absl::StrContains(status.error_message(), "unknown cursor")); @@ -727,7 +510,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { first.set_cursor(0); std::vector requests{open, first}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 3); CHECK(responses[0].tx_id() != 0); @@ -745,7 +528,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { next.set_cursor(0); std::vector requests{open, next}; std::vector responses; - const auto status = kv_client.tx(requests, responses); + const auto status = kv_client->tx(requests, responses); CHECK(status.ok()); CHECK(responses.size() == 3); CHECK(responses[0].tx_id() != 0); @@ -757,12 +540,12 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { SECTION("StateChanges OK: receive streamed state changes") { static constexpr uint64_t kTestPendingBaseFee{10'000}; static constexpr uint64_t kTestGasLimit{10'000'000}; - auto* state_change_source = test.backend->state_change_source(); + auto* state_change_source = state_change_collection.get(); ThreadedKvClient threaded_kv_client; // Start StateChanges server-streaming call and consume incoming messages on dedicated thread - threaded_kv_client.start_and_consume_statechanges(kv_client); + threaded_kv_client.start_and_consume_statechanges(*kv_client); // Keep publishing state changes using the Catch2 thread until at least one has been received BlockNum block_number{0}; @@ -786,13 +569,13 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { SECTION("StateChanges OK: multiple concurrent subscriptions") { static constexpr uint64_t kTestPendingBaseFee{10'000}; static constexpr uint64_t kTestGasLimit{10'000'000}; - auto* state_change_source = test.backend->state_change_source(); + auto* state_change_source = state_change_collection.get(); ThreadedKvClient threaded_kv_client1, threaded_kv_client2; // Start StateChanges server-streaming call and consume incoming messages on dedicated thread - threaded_kv_client1.start_and_consume_statechanges(kv_client); - threaded_kv_client2.start_and_consume_statechanges(kv_client); + threaded_kv_client1.start_and_consume_statechanges(*kv_client); + threaded_kv_client2.start_and_consume_statechanges(*kv_client); // Keep publishing state changes using the Catch2 thread until at least one has been received BlockNum block_number{0}; @@ -818,7 +601,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { } SECTION("StateChanges KO: token already in use") { - auto* state_change_source = test.backend->state_change_source_for_test(); + auto* state_change_source = state_change_collection.get(); std::mutex token_reset_mutex; std::condition_variable token_reset_condition; @@ -838,7 +621,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { // Start a StateChanges server-streaming call grpc::ClientContext context1; remote::StateChangeRequest request1; - auto subscribe_reply_reader1 = kv_client.statechanges_start(&context1, request1); + auto subscribe_reply_reader1 = kv_client->statechanges_start(&context1, request1); // Wait for token reset condition to happen std::unique_lock token_reset_lock{token_reset_mutex}; @@ -847,7 +630,7 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { // Start another StateChanges server-streaming call and check it fails grpc::ClientContext context2; remote::StateChangeRequest request2; - auto subscribe_reply_reader2 = kv_client.statechanges_start(&context2, request2); + auto subscribe_reply_reader2 = kv_client->statechanges_start(&context2, request2); const auto status2 = subscribe_reply_reader2->Finish(); CHECK(!status2.ok()); @@ -864,97 +647,51 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") { SECTION("Snapshots: return snapshot files") { remote::SnapshotsRequest request; remote::SnapshotsReply response; - const auto status = kv_client.snapshots(request, &response); + const auto status = kv_client->snapshots(request, &response); CHECK(status.ok()); } SECTION("HistoryGet: return value in target history") { remote::HistoryGetReq request; remote::HistoryGetReply response; - const auto status = kv_client.history_get(request, &response); + const auto status = kv_client->history_get(request, &response); CHECK(status.ok()); } SECTION("DomainGet: return value in target domain") { remote::DomainGetReq request; remote::DomainGetReply response; - const auto status = kv_client.domain_get(request, &response); + const auto status = kv_client->domain_get(request, &response); CHECK(status.ok()); } SECTION("IndexRange: return value in target index range") { remote::IndexRangeReq request; remote::IndexRangeReply response; - const auto status = kv_client.index_range(request, &response); + const auto status = kv_client->index_range(request, &response); CHECK(status.ok()); } SECTION("HistoryRange: return value in target history range") { remote::HistoryRangeReq request; remote::Pairs response; - const auto status = kv_client.history_range(request, &response); + const auto status = kv_client->history_range(request, &response); CHECK(status.ok()); } SECTION("DomainRange: return value in target domain range") { remote::DomainRangeReq request; remote::Pairs response; - const auto status = kv_client.domain_range(request, &response); - CHECK(status.ok()); - } -} - -TEST_CASE("BackEndKvServer E2E: mainnet chain with zero etherbase", "[silkworm][node][rpc]") { - NodeSettings node_settings; - node_settings.chain_config = kMainnetConfig; - node_settings.etherbase = evmc::address{}; - BackEndKvE2eTest test{silkworm::log::Level::kNone, std::move(node_settings)}; - auto backend_client = *test.backend_client; - - SECTION("Etherbase: return coinbase address") { - remote::EtherbaseReply response; - const auto status = backend_client.etherbase(&response); - CHECK(status.ok()); - CHECK(response.has_address()); - CHECK(response.address() == types::H160()); - } - - SECTION("NetVersion: return network ID") { - remote::NetVersionReply response; - const auto status = backend_client.net_version(&response); - CHECK(status.ok()); - CHECK(response.id() == kMainnetConfig.chain_id); - } -} - -TEST_CASE("BackEndKvServer E2E: one Sentry status OK", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; - auto backend_client = *test.backend_client; - - SECTION("NetPeerCount: return peer count") { - remote::NetPeerCountReply response; - const auto status = backend_client.net_peer_count(&response); - CHECK(status.ok()); - CHECK(response.count() == kTestSentryPeerCount); - } - - SECTION("NodeInfo: return information about nodes") { - remote::NodesInfoRequest request; - request.set_limit(0); - remote::NodesInfoReply response; - const auto status = backend_client.node_info(request, &response); + const auto status = kv_client->domain_range(request, &response); CHECK(status.ok()); - CHECK(response.nodes_info_size() == 1); - CHECK(response.nodes_info(0).id() == kTestSentryNodeId); - CHECK(response.nodes_info(0).name() == kTestSentryNodeClientId); } } #ifndef _WIN32 -TEST_CASE("BackEndKvServer E2E: trigger server-side write error", "[silkworm][node][rpc]") { +TEST_CASE("KvServer E2E: trigger server-side write error", "[silkworm][node][rpc]") { { const uint32_t kNumTxs{1000}; - BackEndKvE2eTest test{silkworm::log::Level::kError}; + KvEnd2EndTest test{silkworm::log::Level::kError}; test.fill_tables(); auto kv_client = *test.kv_client; @@ -975,7 +712,7 @@ TEST_CASE("BackEndKvServer E2E: trigger server-side write error", "[silkworm][no } #endif // _WIN32 -TEST_CASE("BackEndKvServer E2E: Tx max simultaneous readers exceeded", "[silkworm][node][rpc]") { +TEST_CASE("KvServer E2E: Tx max simultaneous readers exceeded", "[silkworm][node][rpc]") { // This check can be improved in Catch2 version 3.3.0 where SKIP is available if (os::max_file_descriptors() < 1024) { bool ok = os::set_max_file_descriptors(1024); @@ -984,7 +721,7 @@ TEST_CASE("BackEndKvServer E2E: Tx max simultaneous readers exceeded", "[silkwor } } - BackEndKvE2eTest test; + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; @@ -1019,8 +756,8 @@ TEST_CASE("BackEndKvServer E2E: Tx max simultaneous readers exceeded", "[silkwor } } -TEST_CASE("BackEndKvServer E2E: Tx max opened cursors exceeded", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; +TEST_CASE("KvServer E2E: Tx max opened cursors exceeded", "[silkworm][node][rpc]") { + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; @@ -1063,9 +800,9 @@ class TxIdleTimeoutGuard { ~TxIdleTimeoutGuard() { TxCall::set_max_idle_duration(server::kDefaultMaxIdleDuration); } }; -TEST_CASE("BackEndKvServer E2E: bidirectional idle timeout", "[silkworm][node][rpc]") { +TEST_CASE("KvServer E2E: bidirectional idle timeout", "[silkworm][node][rpc]") { TxIdleTimeoutGuard timeout_guard{100}; - BackEndKvE2eTest test; + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; @@ -1114,8 +851,8 @@ TEST_CASE("BackEndKvServer E2E: bidirectional idle timeout", "[silkworm][node][r } } -TEST_CASE("BackEndKvServer E2E: Tx cursor valid operations", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; +TEST_CASE("KvServer E2E: Tx cursor valid operations", "[silkworm][node][rpc]") { + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; @@ -2046,8 +1783,8 @@ TEST_CASE("BackEndKvServer E2E: Tx cursor valid operations", "[silkworm][node][r } } -TEST_CASE("BackEndKvServer E2E: Tx cursor invalid operations", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; +TEST_CASE("KvServer E2E: Tx cursor invalid operations", "[silkworm][node][rpc]") { + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; @@ -2207,8 +1944,8 @@ class TxMaxTimeToLiveGuard { ~TxMaxTimeToLiveGuard() { TxCall::set_max_ttl_duration(kMaxTxDuration); } }; -TEST_CASE("BackEndKvServer E2E: bidirectional max TTL duration", "[silkworm][node][rpc]") { - BackEndKvE2eTest test; +TEST_CASE("KvServer E2E: bidirectional max TTL duration", "[silkworm][node][rpc]") { + KvEnd2EndTest test; test.fill_tables(); auto kv_client = *test.kv_client; constexpr auto kCustomMaxTimeToLive{1000ms}; diff --git a/silkworm/node/backend/state_change_collection.cpp b/silkworm/node/remote/kv/grpc/server/state_change_collection.cpp similarity index 100% rename from silkworm/node/backend/state_change_collection.cpp rename to silkworm/node/remote/kv/grpc/server/state_change_collection.cpp diff --git a/silkworm/node/backend/state_change_collection.hpp b/silkworm/node/remote/kv/grpc/server/state_change_collection.hpp similarity index 100% rename from silkworm/node/backend/state_change_collection.hpp rename to silkworm/node/remote/kv/grpc/server/state_change_collection.hpp diff --git a/silkworm/node/backend/state_change_collection_test.cpp b/silkworm/node/remote/kv/grpc/server/state_change_collection_test.cpp similarity index 100% rename from silkworm/node/backend/state_change_collection_test.cpp rename to silkworm/node/remote/kv/grpc/server/state_change_collection_test.cpp diff --git a/silkworm/rpc/ethdb/file/local_cursor.cpp b/silkworm/rpc/ethdb/file/local_cursor.cpp index 19c7c077fc..f9d4738589 100644 --- a/silkworm/rpc/ethdb/file/local_cursor.cpp +++ b/silkworm/rpc/ethdb/file/local_cursor.cpp @@ -38,7 +38,7 @@ Task LocalCursor::seek(ByteView key) { SILK_DEBUG << "LocalCursor::seek cursor: " << cursor_id_ << " key: " << key; mdbx::slice mdbx_key{key}; - const auto result = (key.length() == 0) ? db_cursor_.to_first(/*throw_notfound=*/false) : db_cursor_.lower_bound(mdbx_key, /*throw_notfound=*/false); + const auto result = (key.empty()) ? db_cursor_.to_first(/*throw_notfound=*/false) : db_cursor_.lower_bound(mdbx_key, /*throw_notfound=*/false); SILK_DEBUG << "LocalCursor::seek result: " << db::detail::dump_mdbx_result(result); if (result) {