diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp index c934a442..cd3b9b39 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp @@ -29,13 +29,16 @@ using namespace grpc; void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback) { // TODO: optimize here - ReceivedMessage receivedMsg; + std::unique_ptr request(new ReceivedMessage()); bcos::bytes encodedData; msg->encode(encodedData); - receivedMsg.set_data(encodedData.data(), encodedData.size()); + request->set_data(encodedData.data(), encodedData.size()); + // The ClientContext instance used for creating an rpc must remain alive and valid for the + // lifetime of the rpc auto context = std::make_shared(); auto response = std::make_shared(); - m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(), - [response, callback](Status status) { callback(toError(status, *response)); }); + // lambda keeps the lifecycle for clientContext + m_stub->async()->onReceiveMessage(context.get(), request.get(), response.get(), + [context, response, callback](Status status) { callback(toError(status, *response)); }); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index 8089f947..9a71005b 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -28,15 +28,29 @@ using namespace grpc; using namespace ppc::gateway; using namespace ppc::protocol; +GatewayClient::GatewayClient( + ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints) + : GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel)) +{ + for (auto const& channel : m_broadcastChannels) + { + m_broadcastStubs.insert( + std::make_pair(channel.endPoint, ppc::proto::Gateway::NewStub(channel.channel))); + } +} + void GatewayClient::asyncSendMessage(RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload, long timeout, ReceiveMsgFunc callback) { - auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout); + std::unique_ptr request( + generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout)); auto context = std::make_shared(); auto response = std::make_shared(); + // lambda keeps the lifecycle for clientContext m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(), - [callback, response](Status status) { callback(toError(status, *response)); }); + [context, traceID, callback, response]( + Status status) { callback(toError(status, *response)); }); } void GatewayClient::asyncGetPeers(std::function callback) @@ -44,8 +58,9 @@ void GatewayClient::asyncGetPeers(std::function(); auto context = std::make_shared(); auto request = std::make_shared(); + // lambda keeps the lifecycle for clientContext m_stub->async()->asyncGetPeers( - context.get(), request.get(), response.get(), [callback, response](Status status) { + context.get(), request.get(), response.get(), [context, callback, response](Status status) { callback(toError(status, response->error()), response->peersinfo()); }); } @@ -56,8 +71,9 @@ void GatewayClient::asyncGetAgencies( auto response = std::make_shared(); auto context = std::make_shared(); auto request = std::make_shared(); + // lambda keeps the lifecycle for clientContext m_stub->async()->asyncGetAgencies( - context.get(), request.get(), response.get(), [callback, response](Status status) { + context.get(), request.get(), response.get(), [context, callback, response](Status status) { std::set agencies; for (int i = 0; i < response->agencies_size(); i++) { @@ -69,13 +85,18 @@ void GatewayClient::asyncGetAgencies( bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) { - return broadCast([nodeInfo](ChannelInfo const& channel) { - std::unique_ptr stub( - ppc::proto::Gateway::NewStub(channel.channel)); - auto request = toNodeInfoRequest(nodeInfo); - ClientContext context; - std::shared_ptr response = std::make_shared(); - auto status = stub->registerNodeInfo(&context, *request, response.get()); + std::unique_ptr request(toNodeInfoRequest(nodeInfo)); + return broadCast([&](ChannelInfo const& channel) { + if (!m_broadcastStubs.count(channel.endPoint)) + { + return make_shared( + -1, "registerNodeInfo failed for not find stub for endPoint: " + channel.endPoint); + } + auto const& stub = m_broadcastStubs.at(channel.endPoint); + + auto context = std::make_shared(); + auto response = std::make_shared(); + auto status = stub->registerNodeInfo(context.get(), *request, response.get()); auto result = toError(status, *response); return result; }); @@ -83,25 +104,35 @@ bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) { - return broadCast([nodeID](ChannelInfo const& channel) { - std::unique_ptr stub( - ppc::proto::Gateway::NewStub(channel.channel)); - auto request = toNodeInfoRequest(nodeID, ""); - ClientContext context; - std::shared_ptr response = std::make_shared(); - auto status = stub->unRegisterNodeInfo(&context, *request, response.get()); + std::unique_ptr request(toNodeInfoRequest(nodeID, "")); + return broadCast([&](ChannelInfo const& channel) { + if (!m_broadcastStubs.count(channel.endPoint)) + { + return make_shared(-1, + "unRegisterNodeInfo failed for not find stub for endPoint: " + channel.endPoint); + } + auto const& stub = m_broadcastStubs.at(channel.endPoint); + + auto context = std::make_shared(); + auto response = std::make_shared(); + auto status = stub->unRegisterNodeInfo(context.get(), *request, response.get()); return toError(status, *response); }); } bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) { - return broadCast([nodeID, topic](ChannelInfo const& channel) { - std::unique_ptr stub( - ppc::proto::Gateway::NewStub(channel.channel)); - auto request = toNodeInfoRequest(nodeID, topic); - ClientContext context; - std::shared_ptr response = std::make_shared(); - auto status = stub->registerTopic(&context, *request, response.get()); + std::unique_ptr request(toNodeInfoRequest(nodeID, topic)); + return broadCast([&](ChannelInfo const& channel) { + if (!m_broadcastStubs.count(channel.endPoint)) + { + return make_shared( + -1, "registerTopic failed for not find stub for endPoint: " + channel.endPoint); + } + auto const& stub = m_broadcastStubs.at(channel.endPoint); + + auto context = std::make_shared(); + auto response = std::make_shared(); + auto status = stub->registerTopic(context.get(), *request, response.get()); return toError(status, *response); }); } @@ -109,13 +140,17 @@ bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::s bcos::Error::Ptr GatewayClient::unRegisterTopic( bcos::bytesConstRef nodeID, std::string const& topic) { - return broadCast([nodeID, topic](ChannelInfo const& channel) { - std::unique_ptr stub( - ppc::proto::Gateway::NewStub(channel.channel)); - auto request = toNodeInfoRequest(nodeID, topic); - ClientContext context; - std::shared_ptr response = std::make_shared(); - auto status = stub->unRegisterTopic(&context, *request, response.get()); + std::unique_ptr request(toNodeInfoRequest(nodeID, topic)); + return broadCast([&](ChannelInfo const& channel) { + if (!m_broadcastStubs.count(channel.endPoint)) + { + return make_shared( + -1, "unRegisterTopic failed for not find stub for endPoint: " + channel.endPoint); + } + auto const& stub = m_broadcastStubs.at(channel.endPoint); + auto context = std::make_shared(); + auto response = std::make_shared(); + auto status = stub->unRegisterTopic(context.get(), *request, response.get()); return toError(status, *response); }); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp b/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp index 6516e1ed..c0b3d0a5 100644 --- a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp @@ -37,7 +37,7 @@ GrpcClient::GrpcClient( // create the broadcast channels for (auto const& endPoint : endPointList) { - GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broacast-channel, endpoint: ") << endPoint; + GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broadcast-channel, endpoint: ") << endPoint; m_broadcastChannels.push_back( {endPoint, grpc::CreateCustomChannel(endPoint, grpc::InsecureChannelCredentials(), toChannelConfig(grpcConfig))}); @@ -94,7 +94,7 @@ bcos::Error::Ptr GrpcClient::broadCast( catch (std::exception const& e) { GRPC_CLIENT_LOG(WARNING) - << LOG_DESC("registerNodeInfo exception") << LOG_KV("remote", channel.endPoint) + << LOG_DESC("GrpcClient broadCast exception") << LOG_KV("remote", channel.endPoint) << LOG_KV("error", boost::diagnostic_information(e)); } } diff --git a/cpp/wedpr-protocol/protobuf/src/RequestConverter.h b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h index 7c84d79d..7641103f 100644 --- a/cpp/wedpr-protocol/protobuf/src/RequestConverter.h +++ b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h @@ -43,11 +43,11 @@ inline MessageOptionalHeader::Ptr generateRouteInfo( return routeInfo; } -inline std::shared_ptr generateRequest(std::string const& traceID, +inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& traceID, RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout) { - auto request = std::make_shared(); + auto request = new ppc::proto::SendedMessageRequest(); request->set_traceid(traceID); request->set_routetype(uint16_t(routeType)); // set the route information @@ -65,18 +65,18 @@ inline std::shared_ptr generateRequest(std::st return request; } -inline std::shared_ptr toNodeInfoRequest( +inline ppc::proto::NodeInfo* toNodeInfoRequest( bcos::bytesConstRef const& nodeID, std::string const& topic) { - auto request = std::make_shared(); + auto request = new ppc::proto::NodeInfo(); request->set_nodeid(nodeID.data(), nodeID.size()); request->set_topic(topic); return request; } -inline std::shared_ptr toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo) +inline ppc::proto::NodeInfo* toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo) { - auto request = std::make_shared(); + auto request = new ppc::proto::NodeInfo(); if (!nodeInfo) { return request; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 29ebc29b..fe2becda 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -190,18 +190,25 @@ void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr sessi // try to dispatcher to the front auto p2pMessage = std::dynamic_pointer_cast(msg); auto self = std::weak_ptr(shared_from_this()); + // Note: the callback can only been called once since it binds the callback seq auto callback = [p2pMessage, session, self](Error::Ptr error) { auto gateway = self.lock(); if (!gateway) { return; } + // Note: no need to sendResponse for the response packet + if (p2pMessage->isRespPacket()) + { + return; + } std::string errorCode = std::to_string(CommonError::SUCCESS); if (error && error->errorCode() != 0) { GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveP2PMessage: dispatcherMessage failed") << LOG_KV("code", error->errorCode()) - << LOG_KV("msg", error->errorMessage()); + << LOG_KV("msg", error->errorMessage()) + << printMessage(p2pMessage); errorCode = std::to_string(error->errorCode()); } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index e9a7e25a..513f6b5f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -82,9 +82,27 @@ bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc call // find the front if (!frontList.empty()) { + // Note: the callback can only been called once since it binds the callback seq + int i = 0; for (auto const& front : frontList) { - front->onReceiveMessage(msg, callback); + if (i == 0) + { + front->onReceiveMessage(msg, callback); + } + else + { + front->onReceiveMessage(msg, [](bcos::Error::Ptr error) { + if (!error || error->errorCode() == 0) + { + return; + } + LOCAL_ROUTER_LOG(WARNING) << LOG_DESC("dispatcherMessage to front failed") + << LOG_KV("code", error->errorMessage()) + << LOG_KV("msg", error->errorMessage()); + }); + } + i++; } return true; } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index 88a4c4e6..12dcb06b 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -54,7 +54,6 @@ void Service::onP2PConnect(WsSession::Ptr _session) << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); - RecursiveGuard l(x_nodeID2Session); auto it = m_nodeID2Session.find(_session->nodeId()); if (it != m_nodeID2Session.end() && it->second->isConnected()) @@ -203,18 +202,13 @@ WsSession::Ptr Service::getSessionByNodeID(std::string const& _nodeID) return it->second; } +// Note: this only called by the sender; will not been called when forward void Service::asyncSendMessageByNodeID( std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc) { auto p2pMsg = std::dynamic_pointer_cast(msg); - if (p2pMsg->header()->dstGwNode().empty()) - { - p2pMsg->header()->setDstGwNode(dstNodeID); - } - if (p2pMsg->header()->srcGwNode().empty()) - { - p2pMsg->header()->setSrcGwNode(m_nodeID); - } + p2pMsg->header()->setDstGwNode(dstNodeID); + p2pMsg->header()->setSrcGwNode(m_nodeID); return asyncSendMessageWithForward(dstNodeID, msg, options, respFunc); } @@ -366,7 +360,8 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& WsSessions sessions; sessions.emplace_back(session); WsService::asyncSendMessage(sessions, respMessage); - SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage)) + SERVICE_LOG(TRACE) << "sendRespMessageBySession: " << LOG_KV("resp", printMessage(respMessage)) + << LOG_KV("sessionNode", printP2PIDElegantly(session->nodeId())) << LOG_KV("payloadSize", respMessage->payload() ? respMessage->payload()->size() : 0); } \ No newline at end of file