Skip to content

Commit

Permalink
rpcdaemon: support splitted BackEnd and KV servers (#2088)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat committed Jun 10, 2024
1 parent 342d6ca commit 11195c2
Show file tree
Hide file tree
Showing 18 changed files with 842 additions and 441 deletions.
2 changes: 1 addition & 1 deletion cmd/dev/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <silkworm/infra/concurrency/awaitable_wait_for_one.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/node/backend/ethereum_backend.hpp>
#include <silkworm/node/remote/kv/grpc/server/backend_kv_server.hpp>
#include <silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp>
#include <silkworm/sentry/eth/status_data_provider.hpp>
#include <silkworm/sentry/grpc/client/sentry_client.hpp>
#include <silkworm/sentry/multi_sentry_client.hpp>
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/backend/ethereum_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

#include <silkworm/core/chain/config.hpp>
#include <silkworm/core/common/base.hpp>
#include <silkworm/node/backend/state_change_collection.hpp>
#include <silkworm/node/common/node_settings.hpp>
#include <silkworm/node/remote/kv/grpc/server/state_change_collection.hpp>
#include <silkworm/sentry/api/common/sentry_client.hpp>

namespace silkworm {
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include <silkworm/node/common/preverified_hashes.hpp>
#include <silkworm/node/execution/api/active_direct_service.hpp>
#include <silkworm/node/execution/grpc/server/server.hpp>
#include <silkworm/node/remote/kv/grpc/server/backend_kv_server.hpp>
#include <silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp>
#include <silkworm/node/resource_usage.hpp>
#include <silkworm/node/stagedsync/execution_engine.hpp>

Expand Down
45 changes: 45 additions & 0 deletions silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2022 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "backend_kv_server.hpp"

#include <silkworm/infra/concurrency/task.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/node/remote/ethbackend/grpc/server/backend_calls.hpp>
#include <silkworm/node/remote/kv/grpc/server/kv_calls.hpp>

namespace silkworm::rpc {

BackEndKvServer::BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend)
: Server(settings),
BackEndServer(settings, backend),
KvServer(settings, backend.chaindata_env(), backend.state_change_source()) {
}

// Register the gRPC services: they must exist for the lifetime of the server built by builder.
void BackEndKvServer::register_async_services(grpc::ServerBuilder& builder) {
BackEndServer::register_async_services(builder);
KvServer::register_async_services(builder);
}

// Start server-side RPC requests as required by gRPC async model: one RPC per type is requested in advance.
void BackEndKvServer::register_request_calls() {
BackEndServer::register_request_calls();
KvServer::register_request_calls();
}

} // namespace silkworm::rpc
38 changes: 38 additions & 0 deletions silkworm/node/remote/ethbackend/grpc/server/backend_kv_server.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2022 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <silkworm/core/chain/config.hpp>
#include <silkworm/node/backend/ethereum_backend.hpp>
#include <silkworm/node/remote/ethbackend/grpc/server/backend_server.hpp>
#include <silkworm/node/remote/kv/grpc/server/kv_server.hpp>

namespace silkworm::rpc {

class BackEndKvServer : public BackEndServer, public KvServer {
public:
BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend);

BackEndKvServer(const BackEndKvServer&) = delete;
BackEndKvServer& operator=(const BackEndKvServer&) = delete;

protected:
void register_async_services(grpc::ServerBuilder& builder) override;
void register_request_calls() override;
};

} // namespace silkworm::rpc
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,35 @@
limitations under the License.
*/

#include "backend_kv_server.hpp"
#include "backend_server.hpp"

#include <silkworm/infra/concurrency/task.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/node/remote/ethbackend/grpc/server/backend_calls.hpp>
#include <silkworm/node/remote/kv/grpc/server/kv_calls.hpp>

namespace silkworm::rpc {

BackEndKvServer::BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend)
BackEndServer::BackEndServer(const ServerSettings& settings, const EthereumBackEnd& backend)
: Server(settings), backend_(backend) {
setup_backend_calls(backend);
setup_kv_calls();
SILK_INFO << "BackEndKvServer created listening on: " << settings.address_uri;
SILK_INFO << "BackEndServer created listening on: " << settings.address_uri;
}

// Register the gRPC services: they must exist for the lifetime of the server built by builder.
void BackEndKvServer::register_async_services(grpc::ServerBuilder& builder) {
void BackEndServer::register_async_services(grpc::ServerBuilder& builder) {
builder.RegisterService(&backend_async_service_);
builder.RegisterService(&kv_async_service_);
}

void BackEndKvServer::setup_backend_calls(const EthereumBackEnd& backend) {
void BackEndServer::setup_backend_calls(const EthereumBackEnd& backend) {
EtherbaseCall::fill_predefined_reply(backend);
NetVersionCall::fill_predefined_reply(backend);
BackEndVersionCall::fill_predefined_reply();
ProtocolVersionCall::fill_predefined_reply();
ClientVersionCall::fill_predefined_reply(backend);
}

void BackEndKvServer::register_backend_request_calls(agrpc::GrpcContext* grpc_context) {
void BackEndServer::register_backend_request_calls(agrpc::GrpcContext* grpc_context) {
SILK_TRACE << "BackEndService::register_backend_request_calls START";
auto service = &backend_async_service_;
auto& backend = backend_;
Expand Down Expand Up @@ -86,65 +83,15 @@ void BackEndKvServer::register_backend_request_calls(agrpc::GrpcContext* grpc_co
SILK_TRACE << "BackEndService::register_backend_request_calls END";
}

void BackEndKvServer::setup_kv_calls() {
KvVersionCall::fill_predefined_reply();
}

void BackEndKvServer::register_kv_request_calls(agrpc::GrpcContext* grpc_context) {
SILK_TRACE << "BackEndKvServer::register_kv_request_calls START";
auto service = &kv_async_service_;
auto& backend = backend_;

// Register one requested call repeatedly for each RPC: asio-grpc will take care of re-registration on any incoming call
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestVersion,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await KvVersionCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestTx,
[&backend, grpc_context](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await TxCall{*grpc_context, std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestStateChanges,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await StateChangesCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestSnapshots,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await SnapshotsCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainGet,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await DomainGetCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryGet,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await HistoryGetCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestIndexRange,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await IndexRangeCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryRange,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await HistoryRangeCall{std::forward<decltype(args)>(args)...}(backend);
});
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainRange,
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
co_await DomainRangeCall{std::forward<decltype(args)>(args)...}(backend);
});
SILK_TRACE << "BackEndKvServer::register_kv_request_calls END";
}

//! Start server-side RPC requests as required by gRPC async model: one RPC per type is requested in advance.
void BackEndKvServer::register_request_calls() {
void BackEndServer::register_request_calls() {
// Start all server-side RPC requests for each available server context
for (std::size_t i = 0; i < num_contexts(); i++) {
const auto& context = next_context();
auto grpc_context = context.server_grpc_context();

// Register initial requested calls for ETHBACKEND and KV services
register_backend_request_calls(grpc_context);
register_kv_request_calls(grpc_context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
#include <silkworm/core/chain/config.hpp>
#include <silkworm/infra/grpc/server/server.hpp>
#include <silkworm/interfaces/remote/ethbackend.grpc.pb.h>
#include <silkworm/interfaces/remote/kv.grpc.pb.h>
#include <silkworm/node/backend/ethereum_backend.hpp>

namespace silkworm::rpc {

class BackEndKvServer : public Server {
class BackEndServer : public virtual Server {
public:
BackEndKvServer(const ServerSettings& settings, const EthereumBackEnd& backend);
BackEndServer(const ServerSettings& settings, const EthereumBackEnd& backend);

BackEndKvServer(const BackEndKvServer&) = delete;
BackEndKvServer& operator=(const BackEndKvServer&) = delete;
BackEndServer(const BackEndServer&) = delete;
BackEndServer& operator=(const BackEndServer&) = delete;

protected:
void register_async_services(grpc::ServerBuilder& builder) override;
Expand All @@ -42,17 +41,11 @@ class BackEndKvServer : public Server {
static void setup_backend_calls(const EthereumBackEnd& backend);
void register_backend_request_calls(agrpc::GrpcContext* grpc_context);

static void setup_kv_calls();
void register_kv_request_calls(agrpc::GrpcContext* grpc_context);

//! The Ethereum full node service.
const EthereumBackEnd& backend_;

//! \warning The gRPC service must exist for the lifetime of the gRPC server it is registered on.
remote::ETHBACKEND::AsyncService backend_async_service_;

//! \warning The gRPC service must exist for the lifetime of the gRPC server it is registered on.
remote::KV::AsyncService kv_async_service_;
};

} // namespace silkworm::rpc
Loading

0 comments on commit 11195c2

Please sign in to comment.