Skip to content

Commit

Permalink
fix sendResponse bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 11, 2024
1 parent a4ed066 commit d4595af
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 56 deletions.
11 changes: 7 additions & 4 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ using namespace grpc;
void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback)
{
// TODO: optimize here
ReceivedMessage receivedMsg;
std::unique_ptr<ReceivedMessage> request(new ReceivedMessage());
bcos::bytes encodedData;
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());
request->set_data(encodedData.data(), encodedData.size());

// The ClientContext instance used for creating an rpc must remain alive and valid for the
// lifetime of the rpc
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, *response)); });
// lambda keeps the lifecycle for clientContext
m_stub->async()->onReceiveMessage(context.get(), request.get(), response.get(),
[context, response, callback](Status status) { callback(toError(status, *response)); });
}
99 changes: 67 additions & 32 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,39 @@ using namespace grpc;
using namespace ppc::gateway;
using namespace ppc::protocol;

GatewayClient::GatewayClient(
ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints)
: GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{
for (auto const& channel : m_broadcastChannels)
{
m_broadcastStubs.insert(
std::make_pair(channel.endPoint, ppc::proto::Gateway::NewStub(channel.channel)));
}
}

void GatewayClient::asyncSendMessage(RouteType routeType,
MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload,
long timeout, ReceiveMsgFunc callback)
{
auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout);
std::unique_ptr<ppc::proto::SendedMessageRequest> request(
generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout));
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, *response)); });
[context, traceID, callback, response](
Status status) { callback(toError(status, *response)); });
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetPeers(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
callback(toError(status, response->error()), response->peersinfo());
});
}
Expand All @@ -56,8 +71,9 @@ void GatewayClient::asyncGetAgencies(
auto response = std::make_shared<AgenciesInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetAgencies(
context.get(), request.get(), response.get(), [callback, response](Status status) {
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
std::set<std::string> agencies;
for (int i = 0; i < response->agencies_size(); i++)
{
Expand All @@ -69,53 +85,72 @@ void GatewayClient::asyncGetAgencies(

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
return broadCast([nodeInfo](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeInfo));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(context.get(), *request, response.get());
auto result = toError(status, *response);
return result;
});
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
return broadCast([nodeID](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, ""));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(-1,
"unRegisterNodeInfo failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(context.get(), *request, response.get());
return toError(status, *response);
});
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "registerTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);

auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}

bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
return broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(&context, *request, response.get());
std::unique_ptr<ppc::proto::NodeInfo> request(toNodeInfoRequest(nodeID, topic));
return broadCast([&](ChannelInfo const& channel) {
if (!m_broadcastStubs.count(channel.endPoint))
{
return make_shared<bcos::Error>(
-1, "unRegisterTopic failed for not find stub for endPoint: " + channel.endPoint);
}
auto const& stub = m_broadcastStubs.at(channel.endPoint);
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(context.get(), *request, response.get());
return toError(status, *response);
});
}
4 changes: 2 additions & 2 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GrpcClient::GrpcClient(
// create the broadcast channels
for (auto const& endPoint : endPointList)
{
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broacast-channel, endpoint: ") << endPoint;
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broadcast-channel, endpoint: ") << endPoint;
m_broadcastChannels.push_back(
{endPoint, grpc::CreateCustomChannel(endPoint, grpc::InsecureChannelCredentials(),
toChannelConfig(grpcConfig))});
Expand Down Expand Up @@ -94,7 +94,7 @@ bcos::Error::Ptr GrpcClient::broadCast(
catch (std::exception const& e)
{
GRPC_CLIENT_LOG(WARNING)
<< LOG_DESC("registerNodeInfo exception") << LOG_KV("remote", channel.endPoint)
<< LOG_DESC("GrpcClient broadCast exception") << LOG_KV("remote", channel.endPoint)
<< LOG_KV("error", boost::diagnostic_information(e));
}
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/wedpr-protocol/protobuf/src/RequestConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ inline MessageOptionalHeader::Ptr generateRouteInfo(
return routeInfo;
}

inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::string const& traceID,
inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& traceID,
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload,
long timeout)
{
auto request = std::make_shared<ppc::proto::SendedMessageRequest>();
auto request = new ppc::proto::SendedMessageRequest();
request->set_traceid(traceID);
request->set_routetype(uint16_t(routeType));
// set the route information
Expand All @@ -65,18 +65,18 @@ inline std::shared_ptr<ppc::proto::SendedMessageRequest> generateRequest(std::st
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(
inline ppc::proto::NodeInfo* toNodeInfoRequest(
bcos::bytesConstRef const& nodeID, std::string const& topic)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
request->set_nodeid(nodeID.data(), nodeID.size());
request->set_topic(topic);
return request;
}

inline std::shared_ptr<ppc::proto::NodeInfo> toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
inline ppc::proto::NodeInfo* toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo)
{
auto request = std::make_shared<ppc::proto::NodeInfo>();
auto request = new ppc::proto::NodeInfo();
if (!nodeInfo)
{
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,25 @@ void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr sessi
// try to dispatcher to the front
auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto self = std::weak_ptr<GatewayImpl>(shared_from_this());
// Note: the callback can only been called once since it binds the callback seq
auto callback = [p2pMessage, session, self](Error::Ptr error) {
auto gateway = self.lock();
if (!gateway)
{
return;
}
// Note: no need to sendResponse for the response packet
if (p2pMessage->isRespPacket())
{
return;
}
std::string errorCode = std::to_string(CommonError::SUCCESS);
if (error && error->errorCode() != 0)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveP2PMessage: dispatcherMessage failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
<< LOG_KV("msg", error->errorMessage())
<< printMessage(p2pMessage);
errorCode = std::to_string(error->errorCode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,27 @@ bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc call
// find the front
if (!frontList.empty())
{
// Note: the callback can only been called once since it binds the callback seq
int i = 0;
for (auto const& front : frontList)
{
front->onReceiveMessage(msg, callback);
if (i == 0)
{
front->onReceiveMessage(msg, callback);
}
else
{
front->onReceiveMessage(msg, [](bcos::Error::Ptr error) {
if (!error || error->errorCode() == 0)
{
return;
}
LOCAL_ROUTER_LOG(WARNING) << LOG_DESC("dispatcherMessage to front failed")
<< LOG_KV("code", error->errorMessage())
<< LOG_KV("msg", error->errorMessage());
});
}
i++;
}
return true;
}
Expand Down
15 changes: 5 additions & 10 deletions cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void Service::onP2PConnect(WsSession::Ptr _session)
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());


RecursiveGuard l(x_nodeID2Session);
auto it = m_nodeID2Session.find(_session->nodeId());
if (it != m_nodeID2Session.end() && it->second->isConnected())
Expand Down Expand Up @@ -203,18 +202,13 @@ WsSession::Ptr Service::getSessionByNodeID(std::string const& _nodeID)
return it->second;
}

// Note: this only called by the sender; will not been called when forward
void Service::asyncSendMessageByNodeID(
std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc)
{
auto p2pMsg = std::dynamic_pointer_cast<Message>(msg);
if (p2pMsg->header()->dstGwNode().empty())
{
p2pMsg->header()->setDstGwNode(dstNodeID);
}
if (p2pMsg->header()->srcGwNode().empty())
{
p2pMsg->header()->setSrcGwNode(m_nodeID);
}
p2pMsg->header()->setDstGwNode(dstNodeID);
p2pMsg->header()->setSrcGwNode(m_nodeID);
return asyncSendMessageWithForward(dstNodeID, msg, options, respFunc);
}

Expand Down Expand Up @@ -366,7 +360,8 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const&
WsSessions sessions;
sessions.emplace_back(session);
WsService::asyncSendMessage(sessions, respMessage);
SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage))
SERVICE_LOG(TRACE) << "sendRespMessageBySession: " << LOG_KV("resp", printMessage(respMessage))
<< LOG_KV("sessionNode", printP2PIDElegantly(session->nodeId()))
<< LOG_KV("payloadSize",
respMessage->payload() ? respMessage->payload()->size() : 0);
}

0 comments on commit d4595af

Please sign in to comment.