Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix grpc crash caused by clientContext not alive && reuse broadcast stubs && fix sendResponse bug #31

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GrpcServerConfig

protected:
ppc::protocol::EndPoint m_endPoint;
bool m_enableHealthCheck = false;
bool m_enableHealthCheck = true;
};
class GrpcConfig
{
Expand All @@ -65,7 +65,7 @@ class GrpcConfig
bool enableDnslookup() const { return m_enableDnslookup; }

protected:
bool m_enableHealthCheck = false;
bool m_enableHealthCheck = true;
std::string m_loadBalancePolicy = "round_robin";
bool m_enableDnslookup = false;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@ def __generate_gateway_config_content__(self, config, config_content, listen_por
section = "transport"
config_content[section]["listen_ip"] = config.grpc_listen_ip
# the listen port
config_content[section]["listen_port"] = str(config.grpc_listen_port)
config_content[section]["listen_port"] = str(grpc_listen_port)
4 changes: 2 additions & 2 deletions cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ void PPCConfig::loadFrontConfig(bool requireTransport,
}
m_frontConfig->setNodeID(nodeID);
m_frontConfig->setThreadPoolSize(threadCount);
PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig and not require the transport")
<< printFrontDesc(m_frontConfig);
if (!requireTransport)
{
PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig and not require the transport")
<< printFrontDesc(m_frontConfig);
return;
}

Expand Down
8 changes: 5 additions & 3 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,19 @@ using namespace ppc::crypto;
using namespace ppc::sdk;

Initializer::Initializer(ppc::protocol::NodeArch _arch, std::string const& _configPath)
: m_arch(_arch), m_configPath(_configPath)
: m_arch((uint16_t)_arch), m_configPath(_configPath)
{
m_transportBuilder = std::make_shared<TransportBuilder>();
// load the config
m_config = std::make_shared<PPCConfig>();
if (m_arch == ppc::protocol::NodeArch::PRO)
if (m_arch == (uint16_t)ppc::protocol::NodeArch::PRO)
{
INIT_LOG(INFO) << LOG_DESC("loadNodeConfig for pro node");
m_config->loadNodeConfig(true, m_transportBuilder->frontConfigBuilder(), _configPath);
}
else
{
INIT_LOG(INFO) << LOG_DESC("loadNodeConfig for air node");
m_config->loadNodeConfig(false, m_transportBuilder->frontConfigBuilder(), _configPath);
}
}
Expand All @@ -80,7 +82,7 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)

// Note: must set the m_holdingMessageMinutes before init the node
TransportBuilder transportBuilder;
if (m_arch == ppc::protocol::NodeArch::AIR)
if (m_arch == (uint16_t)ppc::protocol::NodeArch::AIR)
{
m_transport = transportBuilder.build(SDKMode::AIR, m_config->frontConfig(), gateway);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-initializer/Initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Initializer : public std::enable_shared_from_this<Initializer>


private:
ppc::protocol::NodeArch m_arch;
uint16_t m_arch;
std::string m_configPath;
std::shared_ptr<ppc::tools::PPCConfig> m_config;
ProtocolInitializer::Ptr m_protocolInitializer;
Expand Down
3 changes: 1 addition & 2 deletions cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
{
return args;
}
// TODO: when enable round_robin load-balance policy, the program will be exited on dns resolver
// args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
if (grpcConfig->enableHealthCheck())
{
args.SetServiceConfigJSON(
Expand Down
11 changes: 7 additions & 4 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ using namespace grpc;
void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback)
{
// TODO: optimize here
ReceivedMessage receivedMsg;
std::unique_ptr<ReceivedMessage> request(new ReceivedMessage());
bcos::bytes encodedData;
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());
request->set_data(encodedData.data(), encodedData.size());

// The ClientContext instance used for creating an rpc must remain alive and valid for the
// lifetime of the rpc
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, *response)); });
// lambda keeps the lifecycle for clientContext
m_stub->async()->onReceiveMessage(context.get(), request.get(), response.get(),
[context, response, callback](Status status) { callback(toError(status, *response)); });
}
99 changes: 67 additions & 32 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,39 @@ using namespace grpc;
using namespace ppc::gateway;
using namespace ppc::protocol;

GatewayClient::GatewayClient(
ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints)
: GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{
for (auto const& channel : m_broadcastChannels)
{
m_broadcastStubs.insert(
std::make_pair(channel.endPoint, ppc::proto::Gateway::NewStub(channel.channel)));
}
}

void GatewayClient::asyncSendMessage(RouteType routeType,
MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload,
long timeout, ReceiveMsgFunc callback)
{
auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout);
std::unique_ptr<ppc::proto::SendedMessageRequest> request(
generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout));
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, *response)); });
[context, traceID, callback, response](
Status status) { callback(toError(status, *response)); });
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetPeers(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
callback(toError(status, response->error()), response->peersinfo());
});
}
Expand All @@ -56,8 +71,9 @@ void GatewayClient::asyncGetAgencies(
auto response = std::make_shared<AgenciesInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetAgencies(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
std::set<std::string> agencies;
for (int i = 0; i < response->agencies_size(); i++)
{
Expand All @@ -69,53 +85,72 @@ void GatewayClient::asyncGetAgencies(

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
return broadCast([nodeInfo](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeInfo));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(context.get(), *request, response.get());
auto result = toError(status, *response);
return result;
});
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
return broadCast([nodeID](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, ""));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(-1,
"unRegisterNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(context.get(), *request, response.get());
return toError(status, *response);
});
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}

bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "unRegisterTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}
5 changes: 2 additions & 3 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
{
public:
using Ptr = std::shared_ptr<GatewayClient>;
GatewayClient(ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints)
: GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{}
GatewayClient(ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints);

~GatewayClient() override = default;

Expand Down Expand Up @@ -70,5 +68,6 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient

private:
std::unique_ptr<ppc::proto::Gateway::Stub> m_stub;
std::map<std::string, std::unique_ptr<ppc::proto::Gateway::Stub>> m_broadcastStubs;
};
} // namespace ppc::protocol
4 changes: 2 additions & 2 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GrpcClient::GrpcClient(
// create the broadcast channels
for (auto const& endPoint : endPointList)
{
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broacast-channel, endpoint: ") << endPoint;
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broadcast-channel, endpoint: ") << endPoint;
m_broadcastChannels.push_back(
{endPoint, grpc::CreateCustomChannel(endPoint, grpc::InsecureChannelCredentials(),
toChannelConfig(grpcConfig))});
Expand Down Expand Up @@ -94,7 +94,7 @@ bcos::Error::Ptr GrpcClient::broadCast(
catch (std::exception const& e)
{
GRPC_CLIENT_LOG(WARNING)
<< LOG_DESC("registerNodeInfo exception") << LOG_KV("remote", channel.endPoint)
<< LOG_DESC("GrpcClient broadCast exception") << LOG_KV("remote", channel.endPoint)
<< LOG_KV("error", boost::diagnostic_information(e));
}
}
Expand Down
5 changes: 1 addition & 4 deletions cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ class RemoteFrontBuilder : public IFrontBuilder
RemoteFrontBuilder(ppc::protocol::GrpcConfig::Ptr const& grpcConfig,
ppc::protocol::HealthCheckTimer::Ptr healthChecker)
: m_grpcConfig(grpcConfig), m_healthChecker(healthChecker)
{
// Note: the front enable health-check
m_grpcConfig->setEnableHealthCheck(true);
}
{}
~RemoteFrontBuilder() override = default;

IFrontClient::Ptr buildClient(std::string endPoint, std::function<void()> onUnHealthHandler,
Expand Down
12 changes: 6 additions & 6 deletions cpp/wedpr-protocol/protobuf/src/RequestConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ inline MessageOptionalHeader::Ptr generateRouteInfo(
return routeInfo;
}

inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::string const& traceID,
inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& traceID,
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload,
long timeout)
{
auto request = std::make_shared<ppc::proto::SendedMessageRequest>();
auto request = new ppc::proto::SendedMessageRequest();
request->set_traceid(traceID);
request->set_routetype(uint16_t(routeType));
// set the route information
Expand All @@ -65,18 +65,18 @@ inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::st
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(
inline ppc::proto::NodeInfo* toNodeInfoRequest(
bcos::bytesConstRef const& nodeID, std::string const& topic)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
request->set_nodeid(nodeID.data(), nodeID.size());
request->set_topic(topic);
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
inline ppc::proto::NodeInfo* toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
if (!nodeInfo)
{
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,25 @@ void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr sessi
// try to dispatcher to the front
auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto self = std::weak_ptr<GatewayImpl>(shared_from_this());
// Note: the callback can only been called once since it binds the callback seq
auto callback = [p2pMessage, session, self](Error::Ptr error) {
auto gateway = self.lock();
if (!gateway)
{
return;
}
// Note: no need to sendResponse for the response packet
if (p2pMessage->isRespPacket())
{
return;
}
std::string errorCode = std::to_string(CommonError::SUCCESS);
if (error && error->errorCode() != 0)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveP2PMessage: dispatcherMessage failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
<< LOG_KV("msg", error->errorMessage())
<< printMessage(p2pMessage);
errorCode = std::to_string(error->errorCode());
}

Expand Down
Loading
Loading