Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix sendResponse bug
Browse files Browse the repository at this point in the history
cyjseagull committed Sep 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent a4ed066 commit 7c1b469
Showing 8 changed files with 115 additions and 58 deletions.
3 changes: 1 addition & 2 deletions cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
@@ -31,8 +31,7 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
{
return args;
}
// TODO: when enable round_robin load-balance policy, the program will be exited on dns resolver
// args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
if (grpcConfig->enableHealthCheck())
{
args.SetServiceConfigJSON(
11 changes: 7 additions & 4 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
@@ -29,13 +29,16 @@ using namespace grpc;
void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback)
{
// TODO: optimize here
ReceivedMessage receivedMsg;
std::unique_ptr<ReceivedMessage> request(new ReceivedMessage());
bcos::bytes encodedData;
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());
request->set_data(encodedData.data(), encodedData.size());

// The ClientContext instance used for creating an rpc must remain alive and valid for the
// lifetime of the rpc
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, *response)); });
// lambda keeps the lifecycle for clientContext
m_stub->async()->onReceiveMessage(context.get(), request.get(), response.get(),
[context, response, callback](Status status) { callback(toError(status, *response)); });
}
99 changes: 67 additions & 32 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
@@ -28,24 +28,39 @@ using namespace grpc;
using namespace ppc::gateway;
using namespace ppc::protocol;

GatewayClient::GatewayClient(
ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints)
: GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{
for (auto const& channel : m_broadcastChannels)
{
m_broadcastStubs.insert(
std::make_pair(channel.endPoint, ppc::proto::Gateway::NewStub(channel.channel)));
}
}

void GatewayClient::asyncSendMessage(RouteType routeType,
MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload,
long timeout, ReceiveMsgFunc callback)
{
auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout);
std::unique_ptr<ppc::proto::SendedMessageRequest> request(
generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout));
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, *response)); });
[context, traceID, callback, response](
Status status) { callback(toError(status, *response)); });
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetPeers(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
callback(toError(status, response->error()), response->peersinfo());
});
}
@@ -56,8 +71,9 @@ void GatewayClient::asyncGetAgencies(
auto response = std::make_shared<AgenciesInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetAgencies(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
std::set<std::string> agencies;
for (int i = 0; i < response->agencies_size(); i++)
{
@@ -69,53 +85,72 @@ void GatewayClient::asyncGetAgencies(

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
return broadCast([nodeInfo](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeInfo));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(context.get(), *request, response.get());
auto result = toError(status, *response);
return result;
});
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
return broadCast([nodeID](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, ""));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(-1,
"unRegisterNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(context.get(), *request, response.get());
return toError(status, *response);
});
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}

bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "unRegisterTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}
4 changes: 2 additions & 2 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ GrpcClient::GrpcClient(
// create the broadcast channels
for (auto const& endPoint : endPointList)
{
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broacast-channel, endpoint: ") << endPoint;
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broadcast-channel, endpoint: ") << endPoint;
m_broadcastChannels.push_back(
{endPoint, grpc::CreateCustomChannel(endPoint, grpc::InsecureChannelCredentials(),
toChannelConfig(grpcConfig))});
@@ -94,7 +94,7 @@ bcos::Error::Ptr GrpcClient::broadCast(
catch (std::exception const& e)
{
GRPC_CLIENT_LOG(WARNING)
<< LOG_DESC("registerNodeInfo exception") << LOG_KV("remote", channel.endPoint)
<< LOG_DESC("GrpcClient broadCast exception") << LOG_KV("remote", channel.endPoint)
<< LOG_KV("error", boost::diagnostic_information(e));
}
}
12 changes: 6 additions & 6 deletions cpp/wedpr-protocol/protobuf/src/RequestConverter.h
Original file line number Diff line number Diff line change
@@ -43,11 +43,11 @@ inline MessageOptionalHeader::Ptr generateRouteInfo(
return routeInfo;
}

inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::string const& traceID,
inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& traceID,
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload,
long timeout)
{
auto request = std::make_shared<ppc::proto::SendedMessageRequest>();
auto request = new ppc::proto::SendedMessageRequest();
request->set_traceid(traceID);
request->set_routetype(uint16_t(routeType));
// set the route information
@@ -65,18 +65,18 @@ inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::st
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(
inline ppc::proto::NodeInfo* toNodeInfoRequest(
bcos::bytesConstRef const& nodeID, std::string const& topic)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
request->set_nodeid(nodeID.data(), nodeID.size());
request->set_topic(topic);
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
inline ppc::proto::NodeInfo* toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
if (!nodeInfo)
{
return request;
Original file line number Diff line number Diff line change
@@ -190,18 +190,25 @@ void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr sessi
// try to dispatcher to the front
auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto self = std::weak_ptr<GatewayImpl>(shared_from_this());
// Note: the callback can only been called once since it binds the callback seq
auto callback = [p2pMessage, session, self](Error::Ptr error) {
auto gateway = self.lock();
if (!gateway)
{
return;
}
// Note: no need to sendResponse for the response packet
if (p2pMessage->isRespPacket())
{
return;
}
std::string errorCode = std::to_string(CommonError::SUCCESS);
if (error && error->errorCode() != 0)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveP2PMessage: dispatcherMessage failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
<< LOG_KV("msg", error->errorMessage())
<< printMessage(p2pMessage);
errorCode = std::to_string(error->errorCode());
}

Original file line number Diff line number Diff line change
@@ -82,9 +82,27 @@ bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc call
// find the front
if (!frontList.empty())
{
// Note: the callback can only been called once since it binds the callback seq
int i = 0;
for (auto const& front : frontList)
{
front->onReceiveMessage(msg, callback);
if (i == 0)
{
front->onReceiveMessage(msg, callback);
}
else
{
front->onReceiveMessage(msg, [](bcos::Error::Ptr error) {
if (!error || error->errorCode() == 0)
{
return;
}
LOCAL_ROUTER_LOG(WARNING) << LOG_DESC("dispatcherMessage to front failed")
<< LOG_KV("code", error->errorMessage())
<< LOG_KV("msg", error->errorMessage());
});
}
i++;
}
return true;
}
15 changes: 5 additions & 10 deletions cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp
Original file line number Diff line number Diff line change
@@ -54,7 +54,6 @@ void Service::onP2PConnect(WsSession::Ptr _session)
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());


RecursiveGuard l(x_nodeID2Session);
auto it = m_nodeID2Session.find(_session->nodeId());
if (it != m_nodeID2Session.end() && it->second->isConnected())
@@ -203,18 +202,13 @@ WsSession::Ptr Service::getSessionByNodeID(std::string const& _nodeID)
return it->second;
}

// Note: this only called by the sender; will not been called when forward
void Service::asyncSendMessageByNodeID(
std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc)
{
auto p2pMsg = std::dynamic_pointer_cast<Message>(msg);
if (p2pMsg->header()->dstGwNode().empty())
{
p2pMsg->header()->setDstGwNode(dstNodeID);
}
if (p2pMsg->header()->srcGwNode().empty())
{
p2pMsg->header()->setSrcGwNode(m_nodeID);
}
p2pMsg->header()->setDstGwNode(dstNodeID);
p2pMsg->header()->setSrcGwNode(m_nodeID);
return asyncSendMessageWithForward(dstNodeID, msg, options, respFunc);
}

@@ -366,7 +360,8 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const&
WsSessions sessions;
sessions.emplace_back(session);
WsService::asyncSendMessage(sessions, respMessage);
SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage))
SERVICE_LOG(TRACE) << "sendRespMessageBySession: " << LOG_KV("resp", printMessage(respMessage))
<< LOG_KV("sessionNode", printP2PIDElegantly(session->nodeId()))
<< LOG_KV("payloadSize",
respMessage->payload() ? respMessage->payload()->size() : 0);
}

0 comments on commit 7c1b469

Please sign in to comment.