Skip to content

Commit

Permalink
fix grpc client and server caused crash
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 10, 2024
1 parent f2e2f29 commit 98c949d
Show file tree
Hide file tree
Showing 25 changed files with 113 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

/**
* @brief register the nodeInfo to the gateway
Expand Down
2 changes: 1 addition & 1 deletion cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class IGateway

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ class GrpcConfig

bool enableHealthCheck() const { return m_enableHealthCheck; }
void setEnableHealthCheck(bool enableHealthCheck) { m_enableHealthCheck = enableHealthCheck; }
void setEnableDnslookup(bool enableDnslookup) { m_enableDnslookup = enableDnslookup; }

bool enableDnslookup() const { return m_enableDnslookup; }

protected:
bool m_enableHealthCheck = false;
std::string m_loadBalancePolicy = "round_robin";
bool m_enableDnslookup = false;
};
} // namespace ppc::protocol
6 changes: 3 additions & 3 deletions cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __generate_single_node_inner_config__(self, tpl_config_path, node_path, priv
config_content, node_config.rpc_config, node_index)
# load the transport config
self.__generate_transport_config__(config_content,
node_config, node_id, ip)
node_config, node_id, ip, node_index)
# load the storage config
self.__generate_storage_config__(
config_content, node_config.storage_config)
Expand Down Expand Up @@ -192,7 +192,7 @@ def __generate_hdfs_storage_config__(self, config_content, hdfs_storage_config):
hdfs_storage_config.name_node_port)
config_content[section_name]["token"] = hdfs_storage_config.token

def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip):
def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip, node_index):
"""_summary_
Args:
Expand All @@ -211,7 +211,7 @@ def __generate_transport_config__(self, config_content, node_config, node_id, de
section = "transport"
config_content[section]["listen_ip"] = node_config.grpc_listen_ip
config_content[section]["listen_port"] = str(
node_config.grpc_listen_port)
node_config.grpc_listen_port + node_index)
config_content[section]["host_ip"] = deploy_ip
config_content[section]["gateway_target"] = node_config.gateway_config.gateway_grpc_target
config_content[section]["components"] = node_config.components
Expand Down
6 changes: 5 additions & 1 deletion cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void PPCConfig::loadFrontConfig(bool requireTransport,
}
m_frontConfig->setNodeID(nodeID);
m_frontConfig->setThreadPoolSize(threadCount);

PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig and not require the transport")
<< printFrontDesc(m_frontConfig);
if (!requireTransport)
{
return;
Expand All @@ -143,9 +144,12 @@ void PPCConfig::loadFrontConfig(bool requireTransport,
BOOST_THROW_EXCEPTION(
InvalidConfig() << errinfo_comment("Must specify the transport.gateway_target!"));
}
m_frontConfig->setGatewayGrpcTarget(gatewayTargets);
// the components
auto components = pt.get<std::string>("transport.components", "");
boost::split(m_frontConfig->mutableComponents(), components, boost::is_any_of(","));

PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig") << printFrontDesc(m_frontConfig);
}

void PPCConfig::setPrivateKey(bcos::bytes const& _privateKey)
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ inline std::string generateUUID()
return boost::uuids::to_string(uuid_gen());
}
template <typename T>
inline std::string printVector(std::vector<T> const& list)
inline std::string printVector(T const& list)
{
std::stringstream oss;
for (auto const& it : list)
Expand Down
8 changes: 8 additions & 0 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc)

void Initializer::start()
{
if (m_transport)
{
m_transport->start();
}
if (m_ppcFront)
{
m_ppcFront->start();
Expand Down Expand Up @@ -455,6 +459,10 @@ void Initializer::start()

void Initializer::stop()
{
if (m_transport)
{
m_transport->stop();
}
// stop the network firstly
if (m_ppcFront)
{
Expand Down
12 changes: 11 additions & 1 deletion cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
{
return args;
}
args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
// TODO: when enable round_robin load-balance policy, the program will be exited on dns resolver
// args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
if (grpcConfig->enableHealthCheck())
{
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"\"}}");
}
// disable dns lookup
if (!grpcConfig->enableDnslookup())
{
args.SetInt("grpc.enable_dns_srv_lookup", 0);
}
else
{
args.SetInt("grpc.enable_dns_srv_lookup", 1);
}
return args;
}
} // namespace ppc::protocol
1 change: 1 addition & 0 deletions cpp/wedpr-protocol/grpc/client/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@

#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
#define FRONT_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[FRONT][CLIENT]"
#define HEALTH_LOG(LEVEL) BCOS_LOG(LEVEL) << "[HEALTH]"
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @date 2024-09-02
*/
#include "FrontClient.h"
#include "Common.h"
#include "protobuf/src/RequestConverter.h"
#include "wedpr-protocol/protobuf/src/Common.h"

Expand All @@ -33,8 +34,8 @@ void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, Recei
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());

ClientContext context;
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, std::move(*response))); });
m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, *response)); });
}
41 changes: 21 additions & 20 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,88 +33,89 @@ void GatewayClient::asyncSendMessage(RouteType routeType,
long timeout, ReceiveMsgFunc callback)
{
auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout);
ClientContext context;
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->asyncSendMessage(&context, request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, std::move(*response))); });
m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, *response)); });
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
ClientContext context;
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
m_stub->async()->asyncGetPeers(
&context, request.get(), response.get(), [callback, response](Status status) {
callback(toError(status, std::move(*response->mutable_error())), response->peersinfo());
context.get(), request.get(), response.get(), [callback, response](Status status) {
callback(toError(status, response->error()), response->peersinfo());
});
}

void GatewayClient::asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback)
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback)
{
auto response = std::make_shared<AgenciesInfo>();
ClientContext context;
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
m_stub->async()->asyncGetAgencies(
&context, request.get(), response.get(), [callback, response](Status status) {
std::vector<std::string> agencies;
context.get(), request.get(), response.get(), [callback, response](Status status) {
std::set<std::string> agencies;
for (int i = 0; i < response->agencies_size(); i++)
{
agencies.emplace_back(response->agencies(i));
agencies.insert(response->agencies(i));
}
callback(toError(status, std::move(*response->mutable_error())), agencies);
callback(toError(status, response->error()), agencies);
});
}

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
broadCast([nodeInfo](ChannelInfo const& channel) {
return broadCast([nodeInfo](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
auto result = toError(status, *response);
return result;
});
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
broadCast([nodeID](ChannelInfo const& channel) {
return broadCast([nodeID](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
return toError(status, *response);
});
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
broadCast([nodeID, topic](ChannelInfo const& channel) {
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(&context, *request, response.get());
return toError(status, std::move(*response));
return toError(status, *response);
});
}

bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
broadCast([nodeID, topic](ChannelInfo const& channel) {
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(&context, *request, response.get());
return toError(status, std::move(*response));
return toError(status, *response);
});
}
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient

void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) override;
void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) override;
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) override;

void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/grpc/client/HealthCheckTimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ void HealthCheckTimer::start()
return;
}
healthChecker->checkHealth();
healthChecker->m_timer->restart();
});
if (m_timer)
{
Expand Down Expand Up @@ -78,6 +77,8 @@ void HealthCheckTimer::registerHealthCheckHandler(HealthCheckHandler::Ptr health
}
bcos::WriteGuard l(x_healthCheckHandlers);
m_healthCheckHandlers[healthCheckHandler->serviceName] = healthCheckHandler;
HEALTH_LOG(INFO) << LOG_DESC("registerHealthCheckHandler for ")
<< healthCheckHandler->serviceName;
}


Expand Down
5 changes: 3 additions & 2 deletions cpp/wedpr-protocol/grpc/server/FrontServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ using namespace grpc;
ServerUnaryReactor* FrontServer::onReceiveMessage(
CallbackServerContext* context, const ReceivedMessage* receivedMsg, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
ServerUnaryReactor* reactor(context->DefaultReactor());
try
{
// decode the request
auto msg = m_msgBuilder->build(bcos::bytesConstRef(
(bcos::byte*)receivedMsg->data().data(), receivedMsg->data().size()));
m_front->onReceiveMessage(msg, [reactor, reply](bcos::Error::Ptr error) {
FRONT_SERVER_LOG(TRACE) << LOG_DESC("onReceiveMessage");
toSerializedError(reply, error);
reactor->Finish(Status::OK);
});
Expand All @@ -48,5 +49,5 @@ ServerUnaryReactor* FrontServer::onReceiveMessage(
std::make_shared<bcos::Error>(-1, std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
return reactor;
}
Loading

0 comments on commit 98c949d

Please sign in to comment.