From 98895592734760d0fa703744df5e7bba245dffe9 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Tue, 10 Sep 2024 17:09:59 +0800 Subject: [PATCH] add asyncSendResponse && fix MessagePayload decode error when the data size over 65556 (#29) * fix sendResponse bug * fix registerTopic/unregisterTopic bug * add asyncSendResponse implementation * fix MessagePayload decode error when the data size over 65556 --- cpp/ppc-framework/front/FrontInterface.h | 4 +- cpp/ppc-framework/front/IFront.h | 11 +++-- cpp/ppc-framework/protocol/INodeInfo.h | 3 +- cpp/ppc-framework/protocol/Message.h | 1 + cpp/ppc-framework/protocol/PPCMessageFace.h | 3 ++ cpp/ppc-framework/protocol/RouteType.h | 3 ++ cpp/test-utils/FakeFront.h | 4 +- cpp/test-utils/FakePPCMessage.h | 6 +++ cpp/wedpr-computing/ppc-psi/src/PSIConfig.h | 10 ++-- .../ppc-psi/src/ecdh-psi/EcdhPSIImpl.cpp | 2 +- .../src/psi-framework/PSIFramework.cpp | 15 +++--- .../ppc-psi/src/psi-framework/PSIFramework.h | 2 +- .../interfaces/PSIMessageInterface.h | 7 ++- cpp/wedpr-helper/ppc-utilities/Utilities.h | 25 +++++++--- .../protocol/src/JsonTaskImpl.cpp | 15 ++++-- .../protocol/src/PPCMessage.cpp | 7 +-- cpp/wedpr-protocol/protocol/src/PPCMessage.h | 6 +++ .../protocol/src/v1/MessageHeaderImpl.cpp | 20 ++++---- .../protocol/src/v1/MessagePayloadImpl.cpp | 6 +-- .../protocol/tests/MessageTest.cpp | 10 ++++ .../ppc-io/src/DataResourceLoaderImpl.cpp | 8 ++++ .../ppc-front/ppc-front/Front.cpp | 46 +++++++++++++++++-- .../ppc-front/ppc-front/Front.h | 25 ++-------- .../ppc-front/ppc-front/FrontImpl.cpp | 19 +++++++- .../ppc-front/ppc-front/FrontImpl.h | 19 ++++---- .../ppc-gateway/gateway/GatewayImpl.cpp | 2 +- .../gateway/SendMessageWithRetry.cpp | 4 +- .../gateway/router/GatewayNodeInfo.h | 2 +- .../gateway/router/GatewayNodeInfoImpl.cpp | 6 ++- .../gateway/router/GatewayNodeInfoImpl.h | 2 +- .../gateway/router/LocalRouter.cpp | 10 ++-- .../ppc-gateway/ppc-gateway/p2p/Service.cpp | 14 +++++- 32 files changed, 221 insertions(+), 96 deletions(-) diff --git a/cpp/ppc-framework/front/FrontInterface.h b/cpp/ppc-framework/front/FrontInterface.h index 16691928..767afc02 100644 --- a/cpp/ppc-framework/front/FrontInterface.h +++ b/cpp/ppc-framework/front/FrontInterface.h @@ -57,8 +57,8 @@ class FrontInterface uint32_t _timeout, ErrorCallbackFunc _callback, CallbackFunc _respCallback) = 0; // send response when receiving message from given agencyID - virtual void asyncSendResponse(const std::string& _agencyID, std::string const& _uuid, - front::PPCMessageFace::Ptr _message, ErrorCallbackFunc _callback) = 0; + virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + front::PPCMessageFace::Ptr message, ErrorCallbackFunc _callback) = 0; virtual void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, std::function _handler) = 0; diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index 324d8124..25073ac3 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -93,6 +93,9 @@ class IFront : virtual public IFrontClient long timeout, ppc::protocol::ReceiveMsgFunc errorCallback, ppc::protocol::MessageCallback callback) = 0; + virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0; + // the sync interface for async_send_message virtual bcos::Error::Ptr push(ppc::protocol::RouteType routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq, @@ -108,26 +111,26 @@ class IFront : virtual public IFrontClient * @brief register the nodeInfo to the gateway * @param nodeInfo the nodeInfo */ - virtual void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; + virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; /** * @brief unRegister the nodeInfo to the gateway */ - virtual void unRegisterNodeInfo() = 0; + virtual bcos::Error::Ptr unRegisterNodeInfo() = 0; /** * @brief register the topic * * @param topic the topic to register */ - virtual void registerTopic(std::string const& topic) = 0; + virtual bcos::Error::Ptr registerTopic(std::string const& topic) = 0; /** * @brief unRegister the topic * * @param topic the topic to unregister */ - virtual void unRegisterTopic(std::string const& topic) = 0; + virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0; }; class IFrontBuilder diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index 2eb5e1d1..c949e8ed 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -57,7 +57,8 @@ class INodeInfo virtual bool equal(INodeInfo::Ptr const& info) { - return (nodeID() == info->nodeID()) && (components() == info->components()); + return (nodeID().toBytes() == info->nodeID().toBytes()) && + (components() == info->components()); } virtual void toJson(Json::Value& jsonObject) const = 0; diff --git a/cpp/ppc-framework/protocol/Message.h b/cpp/ppc-framework/protocol/Message.h index 8fb013d3..3d78b432 100644 --- a/cpp/ppc-framework/protocol/Message.h +++ b/cpp/ppc-framework/protocol/Message.h @@ -265,6 +265,7 @@ inline std::string printMessage(Message::Ptr const& _msg) std::ostringstream stringstream; stringstream << LOG_KV("from", _msg->header()->srcP2PNodeIDView()) << LOG_KV("to", _msg->header()->dstP2PNodeIDView()) + << LOG_KV("routeType", (ppc::protocol::RouteType)_msg->header()->routeType()) << LOG_KV("ttl", _msg->header()->ttl()) << LOG_KV("rsp", _msg->header()->isRespPacket()) << LOG_KV("traceID", _msg->header()->traceID()) diff --git a/cpp/ppc-framework/protocol/PPCMessageFace.h b/cpp/ppc-framework/protocol/PPCMessageFace.h index a2486ad2..04fc9a76 100644 --- a/cpp/ppc-framework/protocol/PPCMessageFace.h +++ b/cpp/ppc-framework/protocol/PPCMessageFace.h @@ -54,7 +54,10 @@ class PPCMessageFace virtual std::string const& taskID() const = 0; virtual void setTaskID(std::string const&) = 0; virtual std::string const& sender() const = 0; + virtual bcos::bytes const& senderNode() const = 0; virtual void setSender(std::string const&) = 0; + virtual void setSenderNode(bcos::bytes const&) = 0; + virtual std::shared_ptr data() const = 0; virtual void setData(std::shared_ptr) = 0; virtual std::map header() = 0; diff --git a/cpp/ppc-framework/protocol/RouteType.h b/cpp/ppc-framework/protocol/RouteType.h index cb7feb96..0d481345 100644 --- a/cpp/ppc-framework/protocol/RouteType.h +++ b/cpp/ppc-framework/protocol/RouteType.h @@ -45,6 +45,9 @@ inline std::ostream& operator<<(std::ostream& _out, RouteType const& _type) case RouteType::ROUTE_THROUGH_AGENCY: _out << "RouteThroughAgency"; break; + case RouteType::ROUTE_THROUGH_TOPIC: + _out << "RouteThroughTopic"; + break; default: _out << "UnknownRouteType"; break; diff --git a/cpp/test-utils/FakeFront.h b/cpp/test-utils/FakeFront.h index 44b249ef..ab1c9709 100644 --- a/cpp/test-utils/FakeFront.h +++ b/cpp/test-utils/FakeFront.h @@ -184,7 +184,7 @@ class FakeFront : public FrontInterface bcos::Error::Ptr eraseTaskInfo(std::string const&) override { return nullptr; } // send response when receiving message from given agencyID - void asyncSendResponse(const std::string& _agencyID, std::string const& _uuid, + void asyncSendResponse(bcos::bytes const& peer, std::string const& _uuid, front::PPCMessageFace::Ptr _message, ErrorCallbackFunc _callback) override { if (m_uuidToCallback.count(_uuid)) @@ -193,7 +193,7 @@ class FakeFront : public FrontInterface removeCallback(_uuid); if (callback) { - callback(nullptr, _agencyID, _message, nullptr); + callback(nullptr, std::string(peer.begin(), peer.end()), _message, nullptr); } } } diff --git a/cpp/test-utils/FakePPCMessage.h b/cpp/test-utils/FakePPCMessage.h index 8248bc9f..44fa9d56 100644 --- a/cpp/test-utils/FakePPCMessage.h +++ b/cpp/test-utils/FakePPCMessage.h @@ -74,7 +74,13 @@ class FakePPCMessage : public PPCMessageFace // set the message to be response void setResponse() override { m_response = true; } + + bcos::bytes const& senderNode() const override { return m_senderNode; } + + void setSenderNode(bcos::bytes const& senderNode) override { m_senderNode = senderNode; } + private: + bcos::bytes m_senderNode; uint8_t m_version; uint8_t m_taskType; uint8_t m_algorithmType; diff --git a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h index 898c172d..4b67e599 100644 --- a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h +++ b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h @@ -20,6 +20,7 @@ #pragma once #include "Common.h" #include "bcos-utilities/Common.h" +#include "ppc-framework/Helper.h" #include "ppc-framework/front/FrontInterface.h" #include "ppc-framework/io/DataResourceLoader.h" #include "ppc-framework/protocol/Protocol.h" @@ -96,14 +97,15 @@ class PSIConfig _responseCallback); } - void asyncSendResponse(std::string const& _peerID, std::string const& _taskID, + void asyncSendResponse(bcos::bytes const& fromNode, std::string const& _taskID, std::string const& _uuid, PSIMessageInterface::Ptr const& _msg, ppc::front::ErrorCallbackFunc _callback, uint32_t _seq = 0) { auto ppcMsg = generatePPCMsg(_taskID, _msg, _seq); - PSI_LOG(TRACE) << LOG_DESC("sendResponse") << LOG_KV("peer", _peerID) << printPPCMsg(ppcMsg) - << LOG_KV("msgType", (int)_msg->packetType()) << LOG_KV("uuid", _uuid); - m_front->asyncSendResponse(_peerID, _uuid, ppcMsg, _callback); + PSI_LOG(TRACE) << LOG_DESC("sendResponse") << LOG_KV("peer", printNodeID(fromNode)) + << printPPCMsg(ppcMsg) << LOG_KV("msgType", (int)_msg->packetType()) + << LOG_KV("uuid", _uuid); + m_front->asyncSendResponse(fromNode, _uuid, ppcMsg, _callback); } ppc::io::DataResourceLoader::Ptr const& dataResourceLoader() const diff --git a/cpp/wedpr-computing/ppc-psi/src/ecdh-psi/EcdhPSIImpl.cpp b/cpp/wedpr-computing/ppc-psi/src/ecdh-psi/EcdhPSIImpl.cpp index f792b190..3162328d 100644 --- a/cpp/wedpr-computing/ppc-psi/src/ecdh-psi/EcdhPSIImpl.cpp +++ b/cpp/wedpr-computing/ppc-psi/src/ecdh-psi/EcdhPSIImpl.cpp @@ -566,7 +566,7 @@ void EcdhPSIImpl::onHandshakeResponse(PSIMessageInterface::Ptr const& _msg) psiMsg->setErrorCode(0); psiMsg->setErrorMessage("success"); auto startT = bcos::utcSteadyTime(); - m_config->asyncSendResponse(taskState->peerID(), taskState->task()->id(), _msg->uuid(), psiMsg, + m_config->asyncSendResponse(_msg->fromNode(), taskState->task()->id(), _msg->uuid(), psiMsg, [this, startT, _msg](bcos::Error::Ptr _error) { if (!_error || _error->errorCode() == 0) { diff --git a/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.cpp b/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.cpp index fec26576..6010656e 100644 --- a/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.cpp +++ b/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.cpp @@ -150,6 +150,7 @@ void PSIFramework::onReceiveMessage(PPCMessageFace::Ptr _msg) psiMsg->setTaskID(_msg->taskID()); psiMsg->setSeq(_msg->seq()); psiMsg->setUUID(_msg->uuid()); + psiMsg->setFromNode(_msg->senderNode()); m_msgQueue->push(psiMsg); PSI_FRAMEWORK_LOG(TRACE) << LOG_DESC("onReceiveMessage") << printPSIMessage(psiMsg) << LOG_KV("uuid", _msg->uuid()); @@ -699,7 +700,7 @@ void PSIFramework::sendHandshakeRequest(TaskState::Ptr const& _taskState) void PSIFramework::responsePSIResultSyncStatus(int32_t _code, std::string const& _msg, - std::string const& _peer, std::string const& _taskID, std::string const& _uuid, uint32_t _seq) + bcos::bytes const& _peer, std::string const& _taskID, std::string const& _uuid, uint32_t _seq) { // response to the client auto psiMsg = @@ -732,15 +733,17 @@ void PSIFramework::handlePSIResultSyncMsg(PSIMessageInterface::Ptr _resultSyncMs << printPSIMessage(_resultSyncMsg); std::string msg = "sync psi result for task " + _resultSyncMsg->taskID() + " failed for task not found!"; - responsePSIResultSyncStatus((int32_t)PSIRetCode::TaskNotFound, msg, _resultSyncMsg->from(), - _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), _resultSyncMsg->seq()); + responsePSIResultSyncStatus((int32_t)PSIRetCode::TaskNotFound, msg, + _resultSyncMsg->fromNode(), _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), + _resultSyncMsg->seq()); return; } try { taskState->storePSIResult(m_dataResourceLoader, _resultSyncMsg->takeData()); - responsePSIResultSyncStatus((int32_t)PSIRetCode::Success, "success", _resultSyncMsg->from(), - _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), _resultSyncMsg->seq()); + responsePSIResultSyncStatus((int32_t)PSIRetCode::Success, "success", + _resultSyncMsg->fromNode(), _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), + _resultSyncMsg->seq()); } catch (std::exception const& e) { @@ -749,7 +752,7 @@ void PSIFramework::handlePSIResultSyncMsg(PSIMessageInterface::Ptr _resultSyncMs auto errorMessage = "sync psi result for " + _resultSyncMsg->taskID() + " failed, error: " + std::string(boost::diagnostic_information(e)); responsePSIResultSyncStatus((int32_t)PSIRetCode::SyncPSIResultFailed, errorMessage, - _resultSyncMsg->from(), _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), + _resultSyncMsg->fromNode(), _resultSyncMsg->taskID(), _resultSyncMsg->uuid(), _resultSyncMsg->seq()); // cancel the task auto error = BCOS_ERROR_PTR((int32_t)PSIRetCode::SyncPSIResultFailed, errorMessage); diff --git a/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.h b/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.h index 3bab2dc8..66238319 100644 --- a/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.h +++ b/cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.h @@ -189,7 +189,7 @@ class PSIFramework : public bcos::Worker, public ppc::task::TaskFrameworkInterfa m_signalled.wait_for(l, boost::chrono::milliseconds(5)); } void responsePSIResultSyncStatus(int32_t _code, std::string const& _msg, - std::string const& _peer, std::string const& _taskID, std::string const& _uuid, + bcos::bytes const& _peer, std::string const& _taskID, std::string const& _uuid, uint32_t _seq); void broadcastSyncTaskInfo( diff --git a/cpp/wedpr-computing/ppc-psi/src/psi-framework/interfaces/PSIMessageInterface.h b/cpp/wedpr-computing/ppc-psi/src/psi-framework/interfaces/PSIMessageInterface.h index c4263477..3632c74f 100644 --- a/cpp/wedpr-computing/ppc-psi/src/psi-framework/interfaces/PSIMessageInterface.h +++ b/cpp/wedpr-computing/ppc-psi/src/psi-framework/interfaces/PSIMessageInterface.h @@ -76,7 +76,8 @@ class PSIMessageInterface virtual void setTaskID(std::string const& _taskID) { m_taskID = _taskID; } virtual void setSeq(uint32_t _seq) { m_seq = _seq; } virtual void setFrom(std::string const& _from) { m_from = _from; } - + virtual void setFromNode(bcos::bytes const& fromNode) { m_fromNode = fromNode; } + virtual bcos::bytes fromNode() const { return m_fromNode; } virtual std::string const& taskID() const { return m_taskID; } virtual uint32_t seq() const { return m_seq; } @@ -88,7 +89,11 @@ class PSIMessageInterface private: std::string m_taskID; int32_t m_seq; + // the agency std::string m_from; + // the fromNode + bcos::bytes m_fromNode; + std::string m_uuid; }; diff --git a/cpp/wedpr-helper/ppc-utilities/Utilities.h b/cpp/wedpr-helper/ppc-utilities/Utilities.h index 7b989243..a3d90fb2 100644 --- a/cpp/wedpr-helper/ppc-utilities/Utilities.h +++ b/cpp/wedpr-helper/ppc-utilities/Utilities.h @@ -29,20 +29,31 @@ namespace ppc { template -inline uint64_t decodeNetworkBuffer( - T& _result, bcos::byte const* buffer, unsigned int bufferLen, uint64_t const offset) +inline uint64_t decodeNetworkBuffer(T& _result, bcos::byte const* buffer, unsigned int bufferLen, + uint64_t const offset, bool largeBuffer = false) { uint64_t curOffset = offset; - CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); // Notice: operator* is higher priority than operator+, the () is essential - auto dataLen = - boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)(buffer + curOffset))); - curOffset += 2; + uint32_t dataLen = 0; + if (largeBuffer) + { + CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + 4, bufferLen); + dataLen = boost::asio::detail::socket_ops::network_to_host_long( + *((uint32_t*)(buffer + curOffset))); + curOffset += 4; + } + else + { + CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + 2, bufferLen); + dataLen = boost::asio::detail::socket_ops::network_to_host_short( + *((uint16_t*)(buffer + curOffset))); + curOffset += 2; + } if (dataLen == 0) { return curOffset; } - CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); + CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + dataLen, bufferLen); _result.assign((bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen); curOffset += dataLen; return curOffset; diff --git a/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp b/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp index 5378e6a8..f71e47ce 100644 --- a/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp @@ -211,10 +211,17 @@ void JsonTaskImpl::decodeDataResourceDesc(DataResourceDesc::Ptr _desc, Json::Val BOOST_THROW_EXCEPTION( InvalidParameter() << errinfo_comment("The \"..\" cannot be in the path")); } - boost::filesystem::path prePath(m_prePath); - boost::filesystem::path inputPath(path); - boost::filesystem::path filePath(prePath / inputPath); - _desc->setPath(filePath.string()); + if (path.starts_with("/")) + { + _desc->setPath(path); + } + else + { + boost::filesystem::path prePath(m_prePath); + boost::filesystem::path inputPath(path); + boost::filesystem::path filePath(prePath / inputPath); + _desc->setPath(filePath.string()); + } } else { diff --git a/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp b/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp index a316a773..efb7861f 100644 --- a/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp +++ b/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp @@ -17,8 +17,8 @@ * @author: shawnhe * @date 2022-10-19 */ - #include "PPCMessage.h" +#include "Common.h" #include #include @@ -98,7 +98,7 @@ int64_t PPCMessage::decode(uint32_t _length, bcos::byte* _data) p += dataLength; } - if (p) + if (p < _data + _length) { m_header.insert(m_header.begin(), p, _data + _length); } @@ -149,19 +149,20 @@ PPCMessageFace::Ptr PPCMessageFactory::decodePPCMessage(Message::Ptr msg) // Note: this field is been setted when onReceiveMessage if (frontMsg) { + ppcMsg->decode(bcos::ref(frontMsg->data())); ppcMsg->setSeq(frontMsg->seq()); ppcMsg->setUuid(frontMsg->traceID()); if (frontMsg->isRespPacket()) { ppcMsg->setResponse(); } - ppcMsg->decode(bcos::ref(frontMsg->data())); } if (msg->header() && msg->header()->optionalField()) { auto const& routeInfo = msg->header()->optionalField(); ppcMsg->setTaskID(routeInfo->topic()); ppcMsg->setSender(routeInfo->srcInst()); + ppcMsg->setSenderNode(routeInfo->srcNode()); } return ppcMsg; } diff --git a/cpp/wedpr-protocol/protocol/src/PPCMessage.h b/cpp/wedpr-protocol/protocol/src/PPCMessage.h index 148e4fd3..4ce25faf 100644 --- a/cpp/wedpr-protocol/protocol/src/PPCMessage.h +++ b/cpp/wedpr-protocol/protocol/src/PPCMessage.h @@ -83,6 +83,10 @@ class PPCMessage : public PPCMessageFace // set the message to be response void setResponse() override { m_isResponse = true; } + void setSenderNode(bcos::bytes const& senderNode) override { m_senderNode = senderNode; } + + bcos::bytes const& senderNode() const override { return m_senderNode; } + protected: std::string encodeMap(const std::map& _map); std::map decodeMap(const std::string& _encval); @@ -95,6 +99,8 @@ class PPCMessage : public PPCMessageFace uint32_t m_seq = 0; std::string m_taskID; std::string m_sender; + bcos::bytes m_senderNode; + bool m_isResponse; // the uuid used to find the response-callback std::string m_uuid; diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp index cc03557d..6690dd07 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp @@ -65,18 +65,19 @@ int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t con auto pointer = data.data() + offset; m_componentType = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); bcos::bytes componentType; - offset = decodeNetworkBuffer(componentType, data.data(), data.size(), (pointer - data.data())); + offset = decodeNetworkBuffer( + componentType, data.data(), data.size(), (pointer - data.data()), false); m_componentType = std::string(componentType.begin(), componentType.end()); // srcNode - offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset, false); // source inst - offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset, false); // dstNode - offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset, false); // dstInst - offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset, false); // topic - offset = decodeNetworkBuffer(m_topic, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_topic, data.data(), data.size(), offset, false); return offset; } @@ -149,11 +150,12 @@ int64_t MessageHeaderImpl::decode(bcos::bytesConstRef data) m_ext = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); pointer += 2; // the traceID - auto offset = decodeNetworkBuffer(m_traceID, data.data(), data.size(), (pointer - data.data())); + auto offset = + decodeNetworkBuffer(m_traceID, data.data(), data.size(), (pointer - data.data()), false); // srcGwNode - offset = decodeNetworkBuffer(m_srcGwNode, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_srcGwNode, data.data(), data.size(), offset, false); // dstGwNode - offset = decodeNetworkBuffer(m_dstGwNode, data.data(), data.size(), offset); + offset = decodeNetworkBuffer(m_dstGwNode, data.data(), data.size(), offset, false); // optionalField if (hasOptionalField()) { diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp b/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp index 7e2f37b8..07c4327f 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp @@ -42,8 +42,8 @@ int64_t MessagePayloadImpl::encode(bcos::bytes& buffer) const buffer.insert(buffer.end(), (byte*)&traceIDLen, (byte*)&traceIDLen + 2); buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end()); // data - uint16_t dataLen = boost::asio::detail::socket_ops::host_to_network_short(m_data.size()); - buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 2); + uint32_t dataLen = boost::asio::detail::socket_ops::host_to_network_long(m_data.size()); + buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 4); buffer.insert(buffer.end(), m_data.begin(), m_data.end()); // update the length m_length = buffer.size(); @@ -74,5 +74,5 @@ int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer) auto offset = decodeNetworkBuffer(m_traceID, buffer.data(), buffer.size(), (pointer - buffer.data())); // data - return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset); + return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset, true); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp b/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp index 53977bbe..cf7bfce5 100644 --- a/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp +++ b/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp @@ -123,6 +123,16 @@ BOOST_AUTO_TEST_CASE(testMessage) componentType, srcNode, srcInst, dstNode, dstInst, payload); checkEncodeDecode(msgBuilder, msg); + // with payload over 65535 + for (uint32_t i = 0; i < 10000000; i++) + { + payload->emplace_back(i); + } + msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, topic, + componentType, srcNode, srcInst, dstNode, dstInst, payload); + checkEncodeDecode(msgBuilder, msg); + + // with header router traceID = "1233"; srcGwNode = "srcGwNode"; diff --git a/cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.cpp b/cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.cpp index c312bed5..279984c3 100644 --- a/cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.cpp +++ b/cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.cpp @@ -232,6 +232,14 @@ void DataResourceLoaderImpl::checkResourceExists( switch (_desc->type()) { case (int)(DataResourceType::FILE): + { + if (boost::filesystem::exists(boost::filesystem::path(_desc->path()))) + { + BOOST_THROW_EXCEPTION(LoadDataResourceException() << bcos::errinfo_comment( + "The file: " + _desc->path() + " already exists!")); + } + break; + } case (int)(DataResourceType::HDFS): { auto storage = _storage; diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index a3259cd6..153e45ea 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -138,9 +138,13 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace } // send response when receiving message from given agencyID -void Front::asyncSendResponse(const std::string& _agencyID, std::string const& _uuid, - front::PPCMessageFace::Ptr _message, ErrorCallbackFunc _callback) -{} +void Front::asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + PPCMessageFace::Ptr message, ErrorCallbackFunc _callback) +{ + bcos::bytes data; + message->encode(data); + m_front->asyncSendResponse(dstNode, traceID, std::move(data), 0, _callback); +} /** * @brief notice task info to gateway @@ -148,11 +152,43 @@ void Front::asyncSendResponse(const std::string& _agencyID, std::string const& _ */ bcos::Error::Ptr Front::notifyTaskInfo(std::string const& taskID) { - m_front->registerTopic(taskID); + return m_front->registerTopic(taskID); } // erase the task-info when task finished bcos::Error::Ptr Front::eraseTaskInfo(std::string const& _taskID) { - m_front->unRegisterTopic(_taskID); + FRONT_LOG(INFO) << LOG_DESC("eraseTaskInfo") << LOG_KV("front", m_front); + return m_front->unRegisterTopic(_taskID); +} + +// register message handler for algorithm +void Front::registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, + std::function _handler) +{ + uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType; + auto self = weak_from_this(); + m_front->registerMessageHandler( + std::to_string(type), [self, type, _handler](ppc::protocol::Message::Ptr msg) { + auto front = self.lock(); + if (!front) + { + return; + } + try + { + if (msg == nullptr) + { + _handler(nullptr); + return; + } + _handler(front->m_messageFactory->decodePPCMessage(msg)); + } + catch (std::exception const& e) + { + FRONT_LOG(WARNING) << LOG_DESC("Call handler for component failed") + << LOG_KV("componentType", type) + << LOG_KV("error", boost::diagnostic_information(e)); + } + }); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h index 4efa9b5d..5ddd224b 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h @@ -48,9 +48,8 @@ class Front : public FrontInterface, public std::enable_shared_from_this uint32_t _timeout, ErrorCallbackFunc _callback, CallbackFunc _respCallback) override; // send response when receiving message from given agencyID - void asyncSendResponse(const std::string& _agencyID, std::string const& _uuid, - front::PPCMessageFace::Ptr _message, ErrorCallbackFunc _callback) override; - + void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + front::PPCMessageFace::Ptr message, ErrorCallbackFunc _callback) override; /** * @brief notice task info to gateway * @param _taskInfo the latest task information @@ -62,25 +61,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this // register message handler for algorithm void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, - std::function _handler) override - { - uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType; - auto self = weak_from_this(); - m_front->registerMessageHandler( - std::to_string(type), [self, _handler](ppc::protocol::Message::Ptr msg) { - auto front = self.lock(); - if (!front) - { - return; - } - if (msg == nullptr) - { - _handler(nullptr); - return; - } - _handler(front->m_messageFactory->decodePPCMessage(msg)); - }); - } + std::function _handler) override; std::vector agencies() const override { diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp index 470462a2..920ceeb2 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp @@ -37,7 +37,7 @@ FrontImpl::FrontImpl(std::shared_ptr threadPool, m_gatewayClient(gateway) { m_nodeID = m_nodeInfo->nodeID().toBytes(); - m_callbackManager = std::make_shared(m_threadPool, ioService); + m_callbackManager = std::make_shared(m_threadPool, m_ioService); } /** @@ -106,6 +106,23 @@ void FrontImpl::stop() } } +void FrontImpl::asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) +{ + // generate the frontMessage + auto frontMessage = m_messageFactory->build(); + frontMessage->setTraceID(traceID); + frontMessage->setSeq(seq); + frontMessage->setData(std::move(payload)); + + auto routeInfo = m_routerInfoBuilder->build(); + routeInfo->setSrcNode(m_nodeID); + routeInfo->setDstNode(dstNode); + + asyncSendMessageToGateway(true, std::move(frontMessage), RouteType::ROUTE_THROUGH_NODEID, + traceID, routeInfo, -1, errorCallback); +} + /** * @brief async send message * diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index b1c36b66..b60f1e87 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -114,20 +114,20 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ * @brief register the nodeInfo to the gateway * @param nodeInfo the nodeInfo */ - void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override + bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override { FRONT_LOG(INFO) << LOG_DESC("registerNodeInfo") << LOG_KV("nodeInfo", printNodeInfo(m_nodeInfo)); - m_gatewayClient->registerNodeInfo(m_nodeInfo); + return m_gatewayClient->registerNodeInfo(m_nodeInfo); } /** * @brief unRegister the nodeInfo to the gateway */ - void unRegisterNodeInfo() override + bcos::Error::Ptr unRegisterNodeInfo() override { FRONT_LOG(INFO) << LOG_DESC("unRegisterNodeInfo"); - m_gatewayClient->unRegisterNodeInfo(bcos::ref(m_nodeID)); + return m_gatewayClient->unRegisterNodeInfo(bcos::ref(m_nodeID)); } /** @@ -135,10 +135,10 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ * * @param topic the topic to register */ - void registerTopic(std::string const& topic) override + bcos::Error::Ptr registerTopic(std::string const& topic) override { FRONT_LOG(INFO) << LOG_DESC("register topic: ") << topic; - m_gatewayClient->registerTopic(bcos::ref(m_nodeID), topic); + return m_gatewayClient->registerTopic(bcos::ref(m_nodeID), topic); } void asyncGetAgencies( @@ -152,10 +152,10 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ * * @param topic the topic to unregister */ - void unRegisterTopic(std::string const& topic) override + bcos::Error::Ptr unRegisterTopic(std::string const& topic) override { FRONT_LOG(INFO) << LOG_DESC("unregister topic: ") << topic; - m_gatewayClient->unRegisterTopic(bcos::ref(m_nodeID), topic); + return m_gatewayClient->unRegisterTopic(bcos::ref(m_nodeID), topic); } ppc::protocol::MessageOptionalHeaderBuilder::Ptr const routerInfoBuilder() const @@ -167,6 +167,9 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ return m_messageFactory; } + void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, + bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) override; + private: void asyncSendMessageToGateway(bool responsePacket, ppc::protocol::MessagePayload::Ptr&& frontMessage, ppc::protocol::RouteType routeType, diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 18163caa..7a997afc 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -190,7 +190,7 @@ void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr sessi "onReceiveP2PMessage failed to find the node that can dispatch this message") << LOG_KV("msg", printMessage(p2pMessage)); callback(std::make_shared(CommonError::NotFoundFrontServiceDispatchMsg, - "unable to find the ndoe to dispatcher this message, message detail: " + + "unable to find the node to dispatcher this message, message detail: " + printMessage(p2pMessage))); } } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp index b83b07ff..eeb724f8 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp @@ -46,8 +46,8 @@ void SendMessageWithRetry::trySendMessage() { if (m_dstNodeList.empty()) { - GATEWAY_LOG(DEBUG) << LOG_DESC("Gateway::SendMessageWithRetry") - << LOG_DESC("unable to send the message") << printMessage(m_p2pMessage); + GATEWAY_LOG(DEBUG) << LOG_DESC("Gateway::SendMessageWithRetry: unable to send the message") + << printMessage(m_p2pMessage); if (m_respFunc) { m_respFunc(std::make_shared( diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h index 4ed8d833..470d4b3b 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -50,7 +50,7 @@ class GatewayNodeInfo virtual std::vector> chooseRouterByAgency( bool selectAll) const = 0; virtual std::vector> chooseRouterByTopic( - bool selectAll, std::string const& topic) const = 0; + bool selectAll, bcos::bytes const& fromNode, std::string const& topic) const = 0; virtual void encode(bcos::bytes& data) const = 0; virtual void decode(bcos::bytesConstRef data) = 0; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index fb14954b..0dda811e 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -18,6 +18,7 @@ * @date 2024-08-26 */ #include "GatewayNodeInfoImpl.h" +#include "ppc-gateway/Common.h" #include "wedpr-protocol/protobuf/src/Common.h" #include "wedpr-protocol/protobuf/src/NodeInfoImpl.h" #include "wedpr-protocol/tars/Common.h" @@ -152,7 +153,7 @@ std::vector> GatewayNodeInfoImpl::choo } std::vector> GatewayNodeInfoImpl::chooseRouterByTopic( - bool selectAll, std::string const& topic) const + bool selectAll, bcos::bytes const& fromNode, std::string const& topic) const { std::vector> result; // empty topic means broadcast message to all front @@ -174,7 +175,8 @@ std::vector> GatewayNodeInfoImpl::choo { selectedNode = nodeInfo(it.first); } - if (selectedNode != nullptr) + // ignore the fromNode + if (selectedNode != nullptr && selectedNode->nodeID().toBytes() != fromNode) { result.emplace_back(selectedNode->getFront()); } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index 13f0bf25..fbe3a205 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -61,7 +61,7 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo std::vector> chooseRouterByAgency( bool selectAll) const override; std::vector> chooseRouterByTopic( - bool selectAll, std::string const& topic) const override; + bool selectAll, bcos::bytes const& fromNode, std::string const& topic) const override; void registerTopic(bcos::bytes const& nodeID, std::string const& topic) override; void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) override; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index 2b99dec6..3a7c035f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -103,7 +103,8 @@ std::vector LocalRouter::chooseReceiver( ppc::protocol::Message::Ptr const& msg) { std::vector receivers; - if (msg->header()->optionalField()->dstInst() != m_routerInfo->agency()) + auto const& dstInst = msg->header()->optionalField()->dstInst(); + if (!dstInst.empty() && dstInst != m_routerInfo->agency()) { return receivers; } @@ -123,17 +124,20 @@ std::vector LocalRouter::chooseReceiver( } case (uint16_t)RouteType::ROUTE_THROUGH_COMPONENT: { + // Note: should check the dstInst when route-by-component return m_routerInfo->chooseRouteByComponent( selectAll, msg->header()->optionalField()->componentType()); } case (uint16_t)RouteType::ROUTE_THROUGH_AGENCY: { + // Note: should check the dstInst when route-by-agency return m_routerInfo->chooseRouterByAgency(selectAll); } case (uint16_t)RouteType::ROUTE_THROUGH_TOPIC: { - return m_routerInfo->chooseRouterByTopic( - selectAll, msg->header()->optionalField()->topic()); + // Note: should ignore the srcNode when route-by-topic + return m_routerInfo->chooseRouterByTopic(selectAll, + msg->header()->optionalField()->srcNode(), msg->header()->optionalField()->topic()); } default: BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment( diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index 3e96a31f..dfca0e5c 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -40,7 +40,8 @@ Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _rou m_routerTable->setNodeID(m_nodeID); m_routerTable->setUnreachableDistance(unreachableDistance); - SERVICE_LOG(INFO) << LOG_DESC("create P2PService") << LOG_KV("module", _moduleName); + SERVICE_LOG(INFO) << LOG_DESC("create P2PService") << LOG_KV("module", _moduleName) + << LOG_KV("nodeID", printP2PIDElegantly(m_nodeID)); WsService::registerConnectHandler( boost::bind(&Service::onP2PConnect, this, boost::placeholders::_1)); WsService::registerDisconnectHandler( @@ -224,10 +225,12 @@ void Service::asyncSendMessageWithForward( auto nextHop = m_routerTable->getNextHop(dstNodeID); if (nextHop.empty()) { + SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessage directly") << printMessage(p2pMsg); return asyncSendMessage(dstNodeID, msg, options, respFunc); } // with nextHop, send the message to nextHop - SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessageWithForward") << printMessage(p2pMsg); + SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessageWithForward to nextHop") + << printMessage(p2pMsg); return asyncSendMessage(nextHop, msg, options, respFunc); } @@ -346,7 +349,14 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& requestMsg->header()->optionalField()->srcNode()); respMessage->header()->optionalField()->setSrcNode( requestMsg->header()->optionalField()->dstNode()); + + respMessage->header()->optionalField()->setDstInst( + requestMsg->header()->optionalField()->srcInst()); + respMessage->header()->optionalField()->setSrcInst( + requestMsg->header()->optionalField()->dstInst()); } + respMessage->header()->setSrcGwNode(requestMsg->header()->dstGwNode()); + respMessage->header()->setDstGwNode(requestMsg->header()->srcGwNode()); respMessage->header()->setTraceID(requestMsg->header()->traceID()); respMessage->header()->setRespPacket(); respMessage->header()->setRouteType(ppc::protocol::RouteType::ROUTE_THROUGH_NODEID);