From 4415b6e981fd446df968c0dcb6110c55b9c39c4b Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Fri, 6 Sep 2024 19:52:20 +0800 Subject: [PATCH] add ut for protocols && fix gatewayNodeInfo encode concurrency bug (#24) * fix initialize problems * add ut for Message * fix gatewayNodeInfo encode concurrency bug --- cpp/ppc-framework/gateway/IGateway.h | 8 +- cpp/ppc-framework/protocol/INodeInfo.h | 1 + cpp/ppc-framework/protocol/Message.h | 5 +- cpp/ppc-framework/protocol/PPCMessageFace.h | 15 +- cpp/test-utils/FakeFront.h | 10 +- cpp/test-utils/FakePPCMessage.h | 17 ++ .../tests/cm2020-psi/TestCM2020Impl.cpp | 2 +- .../ppc-psi/tests/labeled-psi/DataTools.h | 1 - .../tests/labeled-psi/TestLabeledPSIImpl.cpp | 2 +- .../tests/labeled-psi/TestSenderDB.cpp | 2 +- .../tests/ra2018-psi/TestEcdhPSIImpl.cpp | 6 +- .../tests/ra2018-psi/TestRA2018Impl.cpp | 2 +- .../ppc-psi/tests/ra2018-psi/mock/Common.h | 2 +- .../tests/ra2018-psi/mock/EcdhPSIFixture.h | 2 + .../ppc-tools/src/config/PPCConfig.cpp | 11 +- .../ppc-tools/src/config/PPCConfig.h | 16 +- cpp/wedpr-helper/ppc-utilities/Utilities.h | 13 +- cpp/wedpr-initializer/Initializer.cpp | 20 ++- cpp/wedpr-initializer/Initializer.h | 5 +- .../air-node/AirNodeInitializer.cpp | 4 +- cpp/wedpr-main/gateway/GatewayInitializer.cpp | 2 +- .../pro-node/ProNodeInitializer.cpp | 4 +- .../grpc/client/FrontClient.cpp | 4 +- .../grpc/client/GatewayClient.cpp | 8 +- .../grpc/client/GatewayClient.h | 7 +- .../grpc/server/FrontServer.cpp | 2 +- .../grpc/server/GatewayServer.cpp | 5 +- cpp/wedpr-protocol/proto/pb/Service.proto | 1 + cpp/wedpr-protocol/protobuf/CMakeLists.txt | 35 +--- .../protobuf/src/CMakeLists.txt | 27 ++++ .../protobuf/{ => src}/Common.h | 0 .../protobuf/{ => src}/NodeInfoImpl.cpp | 10 +- .../protobuf/{ => src}/NodeInfoImpl.h | 40 ++--- .../protobuf/{ => src}/RequestConverter.h | 4 +- .../protobuf/tests/CMakeLists.txt | 10 ++ .../protobuf/tests/NodeInfoImplTest.cpp | 71 +++++++++ cpp/wedpr-protocol/protobuf/tests/main.cpp | 2 + cpp/wedpr-protocol/protocol/CMakeLists.txt | 2 +- cpp/wedpr-protocol/protocol/src/Common.h | 29 ++++ .../protocol/src/PPCMessage.cpp | 58 +++++++ cpp/wedpr-protocol/protocol/src/PPCMessage.h | 30 ++-- .../protocol/src/v1/MessageHeaderImpl.cpp | 45 +++--- .../protocol/src/v1/MessageHeaderImpl.h | 6 +- .../protocol/src/v1/MessageImpl.cpp | 32 +++- .../protocol/src/v1/MessageImpl.h | 4 +- .../protocol/src/v1/MessagePayloadImpl.cpp | 4 +- .../protocol/tests/MessageTest.cpp | 150 ++++++++++++++++++ .../protocol/tests/PPCMessageTest.cpp | 127 ++++++++++++--- .../protocol/tests/TestTaskImpl.cpp | 2 +- cpp/wedpr-protocol/tars/CMakeLists.txt | 7 - .../ppc-front/ppc-front/Front.cpp | 5 +- .../ppc-front/ppc-front/Front.h | 2 +- .../ppc-front/ppc-front/FrontImpl.cpp | 13 +- .../ppc-front/ppc-front/FrontImpl.h | 4 +- .../test/unittests/PPCChannelTest.cpp | 9 +- .../ppc-gateway/ppc-gateway/Common.h | 2 + .../ppc-gateway/GatewayFactory.cpp | 4 +- .../ppc-gateway/gateway/GatewayImpl.cpp | 11 +- .../ppc-gateway/gateway/GatewayImpl.h | 7 +- .../gateway/router/GatewayNodeInfoImpl.cpp | 59 ++++--- .../gateway/router/GatewayNodeInfoImpl.h | 44 +++-- .../gateway/router/GatewayRouterManager.cpp | 13 +- .../gateway/router/LocalRouter.cpp | 13 ++ .../ppc-gateway/gateway/router/LocalRouter.h | 16 +- .../ppc-gateway/ppc-gateway/p2p/Service.cpp | 97 +++++++---- .../ppc-gateway/ppc-gateway/p2p/Service.h | 2 +- .../ppc-gateway/p2p/router/RouterManager.cpp | 28 ++-- .../p2p/router/RouterTableImpl.cpp | 77 ++++----- .../unittests/GatewayNodeInfoImplTest.cpp | 135 ++++++++++++++++ .../ppc-gateway/test/unittests/MockCache.h | 103 ++++++------ .../ppc-rpc/src/RpcFactory.cpp | 2 +- cpp/wedpr-transport/sdk/TransportBuilder.cpp | 2 +- cpp/wedpr-transport/sdk/TransportImpl.h | 2 +- 73 files changed, 1104 insertions(+), 418 deletions(-) create mode 100644 cpp/wedpr-protocol/protobuf/src/CMakeLists.txt rename cpp/wedpr-protocol/protobuf/{ => src}/Common.h (100%) rename cpp/wedpr-protocol/protobuf/{ => src}/NodeInfoImpl.cpp (79%) rename cpp/wedpr-protocol/protobuf/{ => src}/NodeInfoImpl.h (67%) rename cpp/wedpr-protocol/protobuf/{ => src}/RequestConverter.h (98%) create mode 100644 cpp/wedpr-protocol/protobuf/tests/CMakeLists.txt create mode 100644 cpp/wedpr-protocol/protobuf/tests/NodeInfoImplTest.cpp create mode 100644 cpp/wedpr-protocol/protobuf/tests/main.cpp create mode 100644 cpp/wedpr-protocol/protocol/src/Common.h create mode 100644 cpp/wedpr-protocol/protocol/tests/MessageTest.cpp create mode 100644 cpp/wedpr-transport/ppc-gateway/test/unittests/GatewayNodeInfoImplTest.cpp diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h index 28b2fcd8..e4422b4a 100644 --- a/cpp/ppc-framework/gateway/IGateway.h +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -58,11 +58,13 @@ class IGateway * @param callback callback */ virtual void asyncSendMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, - long timeout, ppc::protocol::ReceiveMsgFunc callback) = 0; + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, + bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) = 0; virtual void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) = 0; + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, + bcos::bytes&& payload) = 0; + virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0; virtual bcos::Error::Ptr registerTopic( diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index 680f0df9..46b29a8d 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -67,6 +67,7 @@ class INodeInfoFactory virtual INodeInfo::Ptr build() = 0; virtual INodeInfo::Ptr build(bcos::bytesConstRef nodeID, std::string const& endPoint) = 0; + virtual INodeInfo::Ptr build(bcos::bytesConstRef data) = 0; }; inline std::string printNodeInfo(INodeInfo::Ptr const& nodeInfo) diff --git a/cpp/ppc-framework/protocol/Message.h b/cpp/ppc-framework/protocol/Message.h index 95f7225e..fb0f39e6 100644 --- a/cpp/ppc-framework/protocol/Message.h +++ b/cpp/ppc-framework/protocol/Message.h @@ -199,8 +199,9 @@ class Message : virtual public bcos::boostssl::MessageFace protected: MessageHeader::Ptr m_header; - std::shared_ptr m_payload; - MessagePayload::Ptr m_frontMessage; + // Note: allocate here in case of wsService nullptr access caused coredump + std::shared_ptr m_payload = std::make_shared(); + MessagePayload::Ptr m_frontMessage = nullptr; }; class MessageHeaderBuilder diff --git a/cpp/ppc-framework/protocol/PPCMessageFace.h b/cpp/ppc-framework/protocol/PPCMessageFace.h index f6de1791..a2486ad2 100644 --- a/cpp/ppc-framework/protocol/PPCMessageFace.h +++ b/cpp/ppc-framework/protocol/PPCMessageFace.h @@ -33,10 +33,6 @@ namespace ppc { namespace front { -enum MessageExtFlag : uint16_t -{ - ResponseFlag = 0x0001, -}; class PPCMessageFace { public: @@ -86,7 +82,16 @@ class PPCMessageFaceFactory public: virtual ~PPCMessageFaceFactory() {} virtual PPCMessageFace::Ptr buildPPCMessage() = 0; - virtual PPCMessageFace::Ptr buildPPCMessage(ppc::protocol::Message::Ptr msg) = 0; + virtual PPCMessageFace::Ptr decodePPCMessage(ppc::protocol::Message::Ptr msg) = 0; + virtual ppc::protocol::Message::Ptr buildMessage( + ppc::protocol::MessageBuilder::Ptr const& msgBuilder, + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) = 0; + + virtual ppc::protocol::MessagePayload::Ptr buildMessage( + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) = 0; + virtual PPCMessageFace::Ptr buildPPCMessage(bcos::bytesConstRef _data) = 0; virtual PPCMessageFace::Ptr buildPPCMessage(bcos::bytesPointer _buffer) = 0; virtual PPCMessageFace::Ptr buildPPCMessage(uint8_t _taskType, uint8_t _algorithmType, diff --git a/cpp/test-utils/FakeFront.h b/cpp/test-utils/FakeFront.h index 0b54380e..18e7741b 100644 --- a/cpp/test-utils/FakeFront.h +++ b/cpp/test-utils/FakeFront.h @@ -37,8 +37,9 @@ class FakeFront : public FrontInterface FakeFront() = default; ~FakeFront() override = default; - void start() override {} - void stop() override {} + void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, + std::function _handler) override + {} void registerRA2018(std::string const& _agencyID, TaskFrameworkInterface::Ptr _psi) { @@ -73,11 +74,6 @@ class FakeFront : public FrontInterface m_agencyToOTPIR[_agencyID] = _pir; } - void onReceiveMessage(front::PPCMessageFace::Ptr, ErrorCallbackFunc) override - { - throw std::runtime_error("FakeFront: unimplemented interface onReceiveMessage!"); - } - void asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace::Ptr _message, uint32_t _timeout, ErrorCallbackFunc _callback, CallbackFunc _responseCallback) override { diff --git a/cpp/test-utils/FakePPCMessage.h b/cpp/test-utils/FakePPCMessage.h index a9070eae..8248bc9f 100644 --- a/cpp/test-utils/FakePPCMessage.h +++ b/cpp/test-utils/FakePPCMessage.h @@ -111,5 +111,22 @@ class FakePPCMessageFactory : public PPCMessageFaceFactory { throw std::runtime_error("FakePPCMessageFactory: unimplemented interface!"); } + PPCMessageFace::Ptr decodePPCMessage(ppc::protocol::Message::Ptr msg) override + { + throw std::runtime_error("FakePPCMessageFactory: unimplemented interface!"); + } + ppc::protocol::Message::Ptr buildMessage(ppc::protocol::MessageBuilder::Ptr const& msgBuilder, + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) override + { + throw std::runtime_error("FakePPCMessageFactory: unimplemented interface!"); + } + + ppc::protocol::MessagePayload::Ptr buildMessage( + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) override + { + throw std::runtime_error("FakePPCMessageFactory: unimplemented interface!"); + } }; } // namespace ppc::test \ No newline at end of file diff --git a/cpp/wedpr-computing/ppc-psi/tests/cm2020-psi/TestCM2020Impl.cpp b/cpp/wedpr-computing/ppc-psi/tests/cm2020-psi/TestCM2020Impl.cpp index 866bcf54..95ff82e3 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/cm2020-psi/TestCM2020Impl.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/cm2020-psi/TestCM2020Impl.cpp @@ -19,8 +19,8 @@ */ #include "FakeCM2020PSIFactory.h" -#include "ppc-protocol/src/JsonTaskImpl.h" #include "ppc-psi/src/cm2020-psi/CM2020PSIImpl.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/FakeFront.h" #include "test-utils/FileTool.h" #include "test-utils/TaskMock.h" diff --git a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/DataTools.h b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/DataTools.h index dbeabf52..03075d6f 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/DataTools.h +++ b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/DataTools.h @@ -25,7 +25,6 @@ namespace ppc::psi { - inline void genItemsLabels( ppc::io::DataBatch::Ptr _items, ppc::io::DataBatch::Ptr _labels, uint32_t _size) { diff --git a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestLabeledPSIImpl.cpp b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestLabeledPSIImpl.cpp index c7ed5e01..77b1a7ac 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestLabeledPSIImpl.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestLabeledPSIImpl.cpp @@ -19,9 +19,9 @@ */ #include "FakeLabeledPSIFactory.h" -#include "ppc-protocol/src/JsonTaskImpl.h" #include "ppc-psi/src/labeled-psi/LabeledPSIImpl.h" #include "ppc-psi/src/labeled-psi/protocol/LabeledPSIResult.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/FileTool.h" #include "test-utils/TaskMock.h" #include diff --git a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestSenderDB.cpp b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestSenderDB.cpp index d6c50960..2bf6c8ea 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestSenderDB.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/labeled-psi/TestSenderDB.cpp @@ -22,9 +22,9 @@ #include "ppc-crypto-core/src/hash/Sha256Hash.h" #include "ppc-crypto/src/ecc/Ed25519EccCrypto.h" #include "ppc-crypto/src/oprf/EcdhOprf.h" -#include "ppc-protocol/src/JsonTaskImpl.h" #include "ppc-psi/src/labeled-psi/core/LabeledPSIParams.h" #include "ppc-psi/src/labeled-psi/core/SenderDB.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/TaskMock.h" #include #include diff --git a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp index 862d57f9..766aa7df 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp @@ -20,7 +20,7 @@ #include "mock/Common.h" #include "mock/EcdhPSIFixture.h" // Note: it's better not to depends on the task-impl -#include "ppc-protocol/src/JsonTaskImpl.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/TaskMock.h" #include #include @@ -61,8 +61,8 @@ void testEcdhImplFunc(int64_t _dataBatchSize, std::string const& _serverPSIDataS auto clientPSI = factory->createEcdhPSI(clientAgencyName, clientConfig); std::vector agencyList = {serverAgencyName, clientAgencyName}; - serverPSI->psiConfig()->updateAgenyList(agencyList); - clientPSI->psiConfig()->updateAgenyList(agencyList); + // serverPSI->psiConfig()->updateAgenyList(agencyList); + // clientPSI->psiConfig()->updateAgenyList(agencyList); // register the server-psi into the front factory->front()->registerEcdhPSI(serverAgencyName, serverPSI); diff --git a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestRA2018Impl.cpp b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestRA2018Impl.cpp index 30ba8ff2..f25504ae 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestRA2018Impl.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestRA2018Impl.cpp @@ -20,7 +20,7 @@ #include "mock/Common.h" #include "mock/RA2018PSIFixture.h" // Note: it's better not to depends on the task-impl -#include "ppc-protocol/src/JsonTaskImpl.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/TaskMock.h" #include #include diff --git a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/Common.h b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/Common.h index 9786e518..ed8e3b8f 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/Common.h +++ b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/Common.h @@ -18,7 +18,7 @@ * @date 2022-12-29 */ #pragma once -#include "ppc-protocol/src/JsonTaskImpl.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/TaskMock.h" #include diff --git a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/EcdhPSIFixture.h b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/EcdhPSIFixture.h index 6b155285..2bbbd065 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/EcdhPSIFixture.h +++ b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/mock/EcdhPSIFixture.h @@ -27,6 +27,8 @@ #include "test-utils/FakeFront.h" #include "test-utils/FakePPCMessage.h" #include +#include + using namespace bcos; using namespace ppc::protocol; diff --git a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp index 127675fe..c086ab3e 100644 --- a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp +++ b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp @@ -106,14 +106,13 @@ void PPCConfig::loadEndpointConfig(EndPoint& endPoint, bool requireHostIp, endPoint.setPort(listenPort); } -void PPCConfig::loadFrontConfig( +void PPCConfig::loadFrontConfig(bool requireTransport, FrontConfigBuilder::Ptr const& frontConfigBuilder, boost::property_tree::ptree const& pt) { if (m_frontConfig == nullptr) { m_frontConfig = frontConfigBuilder->build(); } - loadEndpointConfig(m_frontConfig->mutableSelfEndPoint(), true, "transport", pt); // the thread_count auto threadCount = pt.get("transport.thread_count", 4); if (threadCount == 0) @@ -128,8 +127,14 @@ void PPCConfig::loadFrontConfig( InvalidConfig() << errinfo_comment("Must specify the transport.nodeid!")); } m_frontConfig->setNodeID(nodeID); - m_frontConfig->setThreadPoolSize(threadCount); + + if (!requireTransport) + { + return; + } + + loadEndpointConfig(m_frontConfig->mutableSelfEndPoint(), true, "transport", pt); // the gateway targets auto gatewayTargets = pt.get("transport.service.gateway_target", ""); if (gatewayTargets.empty()) diff --git a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h index 56b5249f..df42f7c3 100644 --- a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h +++ b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h @@ -125,7 +125,8 @@ class PPCConfig PPCConfig() = default; virtual ~PPCConfig() = default; // load the nodeConfig - void loadNodeConfig(ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, + void loadNodeConfig(bool requireTransport, + ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, std::string const& _configPath) { PPCConfig_LOG(INFO) << LOG_DESC("loadNodeConfig") << LOG_KV("path", _configPath); @@ -133,7 +134,7 @@ class PPCConfig boost::property_tree::read_ini(_configPath, iniConfig); // Note: must load common-config firstly since some ra-configs depends on the common-config loadCommonNodeConfig(iniConfig); - loadFrontConfig(frontConfigBuilder, _configPath); + loadFrontConfig(requireTransport, frontConfigBuilder, _configPath); loadRA2018Config(iniConfig); loadStorageConfig(iniConfig); loadEcdhPSIConfig(iniConfig); @@ -158,13 +159,14 @@ class PPCConfig loadGatewayConfig(iniConfig); } - void loadFrontConfig(ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, + void loadFrontConfig(bool requireTransport, + ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, std::string const& _configPath) { PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig") << LOG_KV("path", _configPath); boost::property_tree::ptree iniConfig; boost::property_tree::read_ini(_configPath, iniConfig); - loadFrontConfig(frontConfigBuilder, iniConfig); + loadFrontConfig(requireTransport, frontConfigBuilder, iniConfig); // load the grpcConfig m_grpcConfig = loadGrpcConfig("transport", iniConfig); m_frontConfig->setGrpcConfig(m_grpcConfig); @@ -235,11 +237,15 @@ class PPCConfig // used by cem module virtual void loadCEMConfig(boost::property_tree::ptree const& _pt); + // for ut + void setAgencyID(std::string const& agencyID) { m_agencyID = agencyID; } + private: virtual void loadEndpointConfig(ppc::protocol::EndPoint& endPoint, bool requireHostIp, std::string const& sectionName, boost::property_tree::ptree const& pt); // load the front config - virtual void loadFrontConfig(ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, + virtual void loadFrontConfig(bool requireTransport, + ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, boost::property_tree::ptree const& pt); // load the grpc config ppc::protocol::GrpcConfig::Ptr loadGrpcConfig( diff --git a/cpp/wedpr-helper/ppc-utilities/Utilities.h b/cpp/wedpr-helper/ppc-utilities/Utilities.h index 13739719..7600e09d 100644 --- a/cpp/wedpr-helper/ppc-utilities/Utilities.h +++ b/cpp/wedpr-helper/ppc-utilities/Utilities.h @@ -27,17 +27,22 @@ namespace ppc { +template inline uint64_t decodeNetworkBuffer( - bcos::bytes& _result, bcos::byte const* buffer, unsigned int bufferLen, uint64_t const offset) + T& _result, bcos::byte const* buffer, unsigned int bufferLen, uint64_t const offset) { uint64_t curOffset = offset; CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); + // Notice: operator* is higher priority than operator+, the () is essential auto dataLen = - boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)buffer + curOffset)); + boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)(buffer + curOffset))); curOffset += 2; + if (dataLen == 0) + { + return curOffset; + } CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); - _result.insert( - _result.end(), (bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen); + _result.assign((bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen); curOffset += dataLen; return curOffset; } diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index ad169481..b5848406 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -51,17 +51,25 @@ using namespace ppc::tools; using namespace ppc::crypto; using namespace ppc::sdk; -Initializer::Initializer(std::string const& _configPath) : m_configPath(_configPath) +Initializer::Initializer(ppc::protocol::NodeArch _arch, std::string const& _configPath) + : m_arch(_arch), m_configPath(_configPath) { m_transportBuilder = std::make_shared(); // load the config m_config = std::make_shared(); - m_config->loadNodeConfig(m_transportBuilder->frontConfigBuilder(), _configPath); + if (m_arch == ppc::protocol::NodeArch::PRO) + { + m_config->loadNodeConfig(true, m_transportBuilder->frontConfigBuilder(), _configPath); + } + else + { + m_config->loadNodeConfig(false, m_transportBuilder->frontConfigBuilder(), _configPath); + } } -void Initializer::init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Ptr const& gateway) +void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway) { - INIT_LOG(INFO) << LOG_DESC("init the wedpr-component") << LOG_KV("arch", _arch); + INIT_LOG(INFO) << LOG_DESC("init the wedpr-component") << LOG_KV("arch", m_arch); // load the protocol m_protocolInitializer = std::make_shared(); m_protocolInitializer->init(m_config); @@ -73,7 +81,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Pt // Note: must set the m_holdingMessageMinutes before init the node TransportBuilder transportBuilder; - if (_arch == ppc::protocol::NodeArch::AIR) + if (m_arch == ppc::protocol::NodeArch::AIR) { m_transport = transportBuilder.build(SDKMode::AIR, m_config->frontConfig(), gateway); } @@ -85,7 +93,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Pt INIT_LOG(INFO) << LOG_DESC("init the frontService success") << LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig())) - << LOG_KV("arch", _arch); + << LOG_KV("arch", m_arch); auto cryptoBox = m_protocolInitializer->cryptoBox(); SQLStorage::Ptr sqlStorage = nullptr; diff --git a/cpp/wedpr-initializer/Initializer.h b/cpp/wedpr-initializer/Initializer.h index d1ad5d01..90e8ae69 100644 --- a/cpp/wedpr-initializer/Initializer.h +++ b/cpp/wedpr-initializer/Initializer.h @@ -59,11 +59,11 @@ class Initializer : public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - Initializer(std::string const& _configPath); + Initializer(ppc::protocol::NodeArch _arch, std::string const& _configPath); virtual ~Initializer() { stop(); } // init the service - virtual void init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Ptr const& gateway); + virtual void init(ppc::gateway::IGateway::Ptr const& gateway); virtual void stop(); virtual void start(); @@ -82,6 +82,7 @@ class Initializer : public std::enable_shared_from_this private: + ppc::protocol::NodeArch m_arch; std::string m_configPath; std::shared_ptr m_config; ProtocolInitializer::Ptr m_protocolInitializer; diff --git a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp index a05788e3..ffcf605a 100644 --- a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp +++ b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp @@ -47,12 +47,12 @@ void AirNodeInitializer::init(std::string const& _configPath) INIT_LOG(INFO) << LOG_DESC("initLog success"); // init the node - m_nodeInitializer = std::make_shared(_configPath); + m_nodeInitializer = std::make_shared(ppc::protocol::NodeArch::AIR, _configPath); // init the gateway initGateway(_configPath); // init the node - m_nodeInitializer->init(ppc::protocol::NodeArch::AIR, m_gateway); + m_nodeInitializer->init(m_gateway); // set the created front to the builder m_frontBuilder->setFront(m_nodeInitializer->transport()->getFront()); // register the NodeInfo diff --git a/cpp/wedpr-main/gateway/GatewayInitializer.cpp b/cpp/wedpr-main/gateway/GatewayInitializer.cpp index b2657958..0173bfb9 100644 --- a/cpp/wedpr-main/gateway/GatewayInitializer.cpp +++ b/cpp/wedpr-main/gateway/GatewayInitializer.cpp @@ -22,7 +22,7 @@ #include "grpc/server/GrpcServer.h" #include "ppc-gateway/GatewayFactory.h" #include "ppc-tools/src/config/PPCConfig.h" -#include "protobuf/NodeInfoImpl.h" +#include "protobuf/src/NodeInfoImpl.h" #include "wedpr-protocol/grpc/client/RemoteFrontBuilder.h" #include "wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h" diff --git a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp index 3e31a4d0..2ced6616 100644 --- a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp +++ b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp @@ -43,10 +43,10 @@ void ProNodeInitializer::init(std::string const& _configPath) INIT_LOG(INFO) << LOG_DESC("initLog success"); // init the node - m_nodeInitializer = std::make_shared(_configPath); + m_nodeInitializer = std::make_shared(ppc::protocol::NodeArch::PRO, _configPath); // init the node(no need to set the gateway) - m_nodeInitializer->init(ppc::protocol::NodeArch::PRO, nullptr); + m_nodeInitializer->init(nullptr); INIT_LOG(INFO) << LOG_DESC("init the rpc"); diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp index 7fe01a6f..a2037613 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp @@ -18,8 +18,8 @@ * @date 2024-09-02 */ #include "FrontClient.h" -#include "protobuf/RequestConverter.h" -#include "wedpr-protocol/protobuf/Common.h" +#include "protobuf/src/RequestConverter.h" +#include "wedpr-protocol/protobuf/src/Common.h" using namespace ppc::protocol; using namespace ppc::proto; diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index eef8c9be..ad7c7276 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -20,7 +20,7 @@ #include "GatewayClient.h" #include "Common.h" #include "Service.grpc.pb.h" -#include "protobuf/RequestConverter.h" +#include "protobuf/src/RequestConverter.h" using namespace ppc; using namespace ppc::proto; @@ -29,10 +29,10 @@ using namespace ppc::gateway; using namespace ppc::protocol; void GatewayClient::asyncSendMessage(RouteType routeType, - MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout, - ReceiveMsgFunc callback) + MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload, + long timeout, ReceiveMsgFunc callback) { - auto request = generateRequest(routeType, routeInfo, std::move(payload), timeout); + auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout); ClientContext context; auto response = std::make_shared(); m_stub->async()->asyncSendMessage(&context, request.get(), response.get(), diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index ea964acd..69513248 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -52,11 +52,12 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient * @param callback callback */ void asyncSendMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, - long timeout, ppc::protocol::ReceiveMsgFunc callback) override; + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, + bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override; void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, + bcos::bytes&& payload) override {} bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override; bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override; diff --git a/cpp/wedpr-protocol/grpc/server/FrontServer.cpp b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp index ee040e6f..8b1182e5 100644 --- a/cpp/wedpr-protocol/grpc/server/FrontServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp @@ -19,7 +19,7 @@ */ #include "FrontServer.h" #include "Common.h" -#include "protobuf/RequestConverter.h" +#include "protobuf/src/RequestConverter.h" #include using namespace ppc::proto; diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp index 91f690c4..3b5caf69 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp @@ -19,7 +19,7 @@ */ #include "GatewayServer.h" #include "Common.h" -#include "protobuf/RequestConverter.h" +#include "protobuf/src/RequestConverter.h" using namespace ppc::protocol; using namespace grpc; @@ -33,7 +33,8 @@ ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* conte bcos::bytes payloadData(sendedMsg->payload().begin(), sendedMsg->payload().end()); auto routeInfo = generateRouteInfo(m_routeInfoBuilder, sendedMsg->routeinfo()); m_gateway->asyncSendMessage((ppc::protocol::RouteType)sendedMsg->routetype(), routeInfo, - std::move(payloadData), sendedMsg->timeout(), [reactor, reply](bcos::Error::Ptr error) { + sendedMsg->traceid(), std::move(payloadData), sendedMsg->timeout(), + [reactor, reply](bcos::Error::Ptr error) { toSerializedError(reply, error); reactor->Finish(Status::OK); }); diff --git a/cpp/wedpr-protocol/proto/pb/Service.proto b/cpp/wedpr-protocol/proto/pb/Service.proto index eb241af8..e46d91b0 100644 --- a/cpp/wedpr-protocol/proto/pb/Service.proto +++ b/cpp/wedpr-protocol/proto/pb/Service.proto @@ -28,6 +28,7 @@ message SendedMessageRequest{ RouteInfo routeInfo = 2; bytes payload = 3; int64 timeout = 4; + string traceID = 5; }; service Front { rpc onReceiveMessage (ReceivedMessage) returns (Error) {} diff --git a/cpp/wedpr-protocol/protobuf/CMakeLists.txt b/cpp/wedpr-protocol/protobuf/CMakeLists.txt index 6dd7bc81..615270ec 100644 --- a/cpp/wedpr-protocol/protobuf/CMakeLists.txt +++ b/cpp/wedpr-protocol/protobuf/CMakeLists.txt @@ -1,27 +1,8 @@ -# proto generation -set(PROTO_INPUT_PATH ${CMAKE_SOURCE_DIR}/wedpr-protocol/proto/pb) - -file(GLOB_RECURSE MESSAGES_PROTOS "${PROTO_INPUT_PATH}/*.proto") - -find_program(PROTOC_BINARY protoc REQUIRED) - -# create PROTO_OUTPUT_PATH -file(MAKE_DIRECTORY ${PROTO_OUTPUT_PATH}) -foreach(proto_file ${MESSAGES_PROTOS}) - get_filename_component(basename ${proto_file} NAME_WE) - set(generated_file ${PROTO_OUTPUT_PATH}/${basename}.pb.cc) - - list(APPEND MESSAGES_SRCS ${generated_file}) - - message("Command: protoc --cpp_out ${PROTO_OUTPUT_PATH} -I ${PROTO_INPUT_PATH} ${proto_file}") - add_custom_command( - OUTPUT ${generated_file} - COMMAND ${PROTOC_BINARY} --cpp_out ${PROTO_OUTPUT_PATH} -I ${PROTO_INPUT_PATH} ${proto_file} - COMMENT "Generating ${generated_file} from ${proto_file}" - VERBATIM - ) -endforeach() - -file(GLOB_RECURSE SRCS *.cpp) -add_library(${PB_PROTOCOL_TARGET} ${SRCS} ${MESSAGES_SRCS}) -target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET}) \ No newline at end of file +project(ppc-protocol VERSION ${VERSION}) + +add_subdirectory(src) +if (TESTS) + enable_testing() + set(CTEST_OUTPUT_ON_FAILURE TRUE) + add_subdirectory(tests) +endif () diff --git a/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt b/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt new file mode 100644 index 00000000..7aaa0e0d --- /dev/null +++ b/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt @@ -0,0 +1,27 @@ +# proto generation +set(PROTO_INPUT_PATH ${CMAKE_SOURCE_DIR}/wedpr-protocol/proto/pb) + +file(GLOB_RECURSE MESSAGES_PROTOS "${PROTO_INPUT_PATH}/*.proto") + +find_program(PROTOC_BINARY protoc REQUIRED) + +# create PROTO_OUTPUT_PATH +file(MAKE_DIRECTORY ${PROTO_OUTPUT_PATH}) +foreach(proto_file ${MESSAGES_PROTOS}) + get_filename_component(basename ${proto_file} NAME_WE) + set(generated_file ${PROTO_OUTPUT_PATH}/${basename}.pb.cc) + + list(APPEND MESSAGES_SRCS ${generated_file}) + + message("Command: protoc --cpp_out ${PROTO_OUTPUT_PATH} -I ${PROTO_INPUT_PATH} ${proto_file}") + add_custom_command( + OUTPUT ${generated_file} + COMMAND ${PROTOC_BINARY} --cpp_out ${PROTO_OUTPUT_PATH} -I ${PROTO_INPUT_PATH} ${proto_file} + COMMENT "Generating ${generated_file} from ${proto_file}" + VERBATIM + ) +endforeach() + +file(GLOB_RECURSE SRCS *.cpp) +add_library(${PB_PROTOCOL_TARGET} ${SRCS} ${MESSAGES_SRCS}) +target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} protobuf::libprotobuf ${CPU_FEATURES_LIB}) \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/Common.h b/cpp/wedpr-protocol/protobuf/src/Common.h similarity index 100% rename from cpp/wedpr-protocol/protobuf/Common.h rename to cpp/wedpr-protocol/protobuf/src/Common.h diff --git a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.cpp b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp similarity index 79% rename from cpp/wedpr-protocol/protobuf/NodeInfoImpl.cpp rename to cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp index 742d53a8..93f0f66a 100644 --- a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.cpp +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp @@ -28,13 +28,13 @@ void NodeInfoImpl::encode(bcos::bytes& data) const // set the components for (auto const& component : m_components) { - m_inner()->add_components(component); + m_rawNodeInfo->add_components(component); } - encodePBObject(data, m_inner()); + encodePBObject(data, m_rawNodeInfo); } void NodeInfoImpl::decode(bcos::bytesConstRef data) { - decodePBObject(m_inner(), data); - m_components = - std::set(m_inner()->components().begin(), m_inner()->components().end()); + decodePBObject(m_rawNodeInfo, data); + m_components = std::set( + m_rawNodeInfo->components().begin(), m_rawNodeInfo->components().end()); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h similarity index 67% rename from cpp/wedpr-protocol/protobuf/NodeInfoImpl.h rename to cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h index a81ad5c0..ed8f6a68 100644 --- a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h @@ -29,27 +29,28 @@ class NodeInfoImpl : public INodeInfo { public: using Ptr = std::shared_ptr; - explicit NodeInfoImpl(std::function inner) : m_inner(std::move(inner)) + NodeInfoImpl() { m_rawNodeInfo = std::make_shared(); } + explicit NodeInfoImpl(std::shared_ptr rawNodeInfo) + : m_rawNodeInfo(rawNodeInfo) {} - NodeInfoImpl() : m_inner([inner = ppc::proto::NodeInfo()]() mutable { return &inner; }) {} + NodeInfoImpl(bcos::bytesConstRef const& data) : NodeInfoImpl() { decode(data); } - NodeInfoImpl(bcos::bytesConstRef const& nodeID) : NodeInfoImpl() + NodeInfoImpl(bcos::bytesConstRef const& nodeID, std::string const& endPoint) : NodeInfoImpl() { - m_inner()->set_nodeid(nodeID.data(), nodeID.size()); + m_rawNodeInfo->set_nodeid(nodeID.data(), nodeID.size()); + m_rawNodeInfo->set_endpoint(endPoint); } - NodeInfoImpl(bcos::bytesConstRef const& nodeID, std::string const& endPoint) - : NodeInfoImpl(nodeID) - { - m_inner()->set_endpoint(endPoint); - } - ~NodeInfoImpl() override = default; + ~NodeInfoImpl() override {} void setNodeID(bcos::bytesConstRef nodeID) override { - m_inner()->set_nodeid(nodeID.data(), nodeID.size()); + m_rawNodeInfo->set_nodeid(nodeID.data(), nodeID.size()); + } + void setEndPoint(std::string const& endPoint) override + { + m_rawNodeInfo->set_endpoint(endPoint); } - void setEndPoint(std::string const& endPoint) override { m_inner()->set_endpoint(endPoint); } void setComponents(std::set const& components) override { @@ -57,17 +58,17 @@ class NodeInfoImpl : public INodeInfo } std::set const& components() const override { return m_components; } - std::string const& endPoint() const override { return m_inner()->endpoint(); } + std::string const& endPoint() const override { return m_rawNodeInfo->endpoint(); } bcos::bytesConstRef nodeID() const override { - return {reinterpret_cast(m_inner()->nodeid().data()), - m_inner()->nodeid().size()}; + return {reinterpret_cast(m_rawNodeInfo->nodeid().data()), + m_rawNodeInfo->nodeid().size()}; } void encode(bcos::bytes& data) const override; void decode(bcos::bytesConstRef data) override; - std::function innerFunc() { return m_inner; } + std::shared_ptr rawNodeInfo() { return m_rawNodeInfo; } void setFront(std::shared_ptr&& front) override { @@ -78,7 +79,7 @@ class NodeInfoImpl : public INodeInfo private: std::shared_ptr m_front; std::set m_components; - std::function m_inner; + std::shared_ptr m_rawNodeInfo; }; class NodeInfoFactory : public INodeInfoFactory @@ -90,7 +91,10 @@ class NodeInfoFactory : public INodeInfoFactory INodeInfo::Ptr build() override { return std::make_shared(); } - + INodeInfo::Ptr build(bcos::bytesConstRef data) override + { + return std::make_shared(data); + } INodeInfo::Ptr build(bcos::bytesConstRef nodeID, std::string const& endPoint) override { return std::make_shared(nodeID, endPoint); diff --git a/cpp/wedpr-protocol/protobuf/RequestConverter.h b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h similarity index 98% rename from cpp/wedpr-protocol/protobuf/RequestConverter.h rename to cpp/wedpr-protocol/protobuf/src/RequestConverter.h index faefc75d..4f1544bb 100644 --- a/cpp/wedpr-protocol/protobuf/RequestConverter.h +++ b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h @@ -42,10 +42,12 @@ inline MessageOptionalHeader::Ptr generateRouteInfo( return routeInfo; } -inline std::shared_ptr generateRequest(RouteType routeType, +inline std::shared_ptr generateRequest(std::string const& traceID, + RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout) { auto request = std::make_shared(); + request->set_traceid(traceID); request->set_routetype(uint16_t(routeType)); // set the route information request->mutable_routeinfo()->set_topic(routeInfo->topic()); diff --git a/cpp/wedpr-protocol/protobuf/tests/CMakeLists.txt b/cpp/wedpr-protocol/protobuf/tests/CMakeLists.txt new file mode 100644 index 00000000..c7310f2f --- /dev/null +++ b/cpp/wedpr-protocol/protobuf/tests/CMakeLists.txt @@ -0,0 +1,10 @@ +file(GLOB_RECURSE SOURCES "*.cpp" "*.h") + +# cmake settings +set(TEST_BINARY_NAME test-protobuf) + +add_executable(${TEST_BINARY_NAME} ${SOURCES}) +target_include_directories(${TEST_BINARY_NAME} PRIVATE .) + +target_link_libraries(${TEST_BINARY_NAME} ${PB_PROTOCOL_TARGET} TBB::tbb ${BOOST_UNIT_TEST}) +add_test(NAME test-protobuf WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND ${TEST_BINARY_NAME}) \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/tests/NodeInfoImplTest.cpp b/cpp/wedpr-protocol/protobuf/tests/NodeInfoImplTest.cpp new file mode 100644 index 00000000..a234a8e1 --- /dev/null +++ b/cpp/wedpr-protocol/protobuf/tests/NodeInfoImplTest.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file NodeInfoImplTest.cpp + * @author: yujiechen + * @date 2024-09-06 + */ + +#include "protobuf/src/NodeInfoImpl.h" +#include +#include + +using namespace ppc; +using namespace ppc::protocol; +using namespace bcos::test; + +BOOST_FIXTURE_TEST_SUITE(NodeInfoTest, TestPromptFixture) + +INodeInfo::Ptr fakeNodeInfo(INodeInfoFactory::Ptr factory, std::string const& nodeID, + std::string const& endPoint, std::set const& components) +{ + auto nodeInfo = factory->build(); + nodeInfo->setNodeID(bcos::bytesConstRef((bcos::byte*)nodeID.data(), nodeID.size())); + nodeInfo->setEndPoint(endPoint); + nodeInfo->setComponents(components); + return nodeInfo; +} + + +void testNodeInfoEncodeDecode(INodeInfoFactory::Ptr factory, INodeInfo::Ptr nodeInfo) +{ + bcos::bytes encodedData; + nodeInfo->encode(encodedData); + + auto decodedNodeInfo = factory->build(bcos::ref(encodedData)); + BOOST_CHECK(nodeInfo->nodeID().toBytes() == decodedNodeInfo->nodeID().toBytes()); + BOOST_CHECK(nodeInfo->endPoint() == decodedNodeInfo->endPoint()); + auto const& components = nodeInfo->components(); + for (auto const& decodedComp : decodedNodeInfo->components()) + { + BOOST_CHECK(components.count(decodedComp)); + } +} + +BOOST_AUTO_TEST_CASE(testNodeInfo) +{ + auto nodeInfoFactory = std::make_shared(); + std::string nodeID = "testn+NodeID"; + std::string endPoint = "testEndpoint"; + std::set components; + for (int i = 0; i < 100; i++) + { + components.insert("component_" + std::to_string(i)); + } + auto nodeInfo = fakeNodeInfo(nodeInfoFactory, nodeID, endPoint, components); + testNodeInfoEncodeDecode(nodeInfoFactory, nodeInfo); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/tests/main.cpp b/cpp/wedpr-protocol/protobuf/tests/main.cpp new file mode 100644 index 00000000..2c06f9b5 --- /dev/null +++ b/cpp/wedpr-protocol/protobuf/tests/main.cpp @@ -0,0 +1,2 @@ +#define BOOST_TEST_MAIN +#include \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/CMakeLists.txt b/cpp/wedpr-protocol/protocol/CMakeLists.txt index 615270ec..cda79d6d 100644 --- a/cpp/wedpr-protocol/protocol/CMakeLists.txt +++ b/cpp/wedpr-protocol/protocol/CMakeLists.txt @@ -1,4 +1,4 @@ -project(ppc-protocol VERSION ${VERSION}) +project(ppc-protobuf VERSION ${VERSION}) add_subdirectory(src) if (TESTS) diff --git a/cpp/wedpr-protocol/protocol/src/Common.h b/cpp/wedpr-protocol/protocol/src/Common.h new file mode 100644 index 00000000..9875b303 --- /dev/null +++ b/cpp/wedpr-protocol/protocol/src/Common.h @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Common.h + * @author: yujiechen + * @date 2024-9-6 + */ + +#pragma once + +#include "ppc-framework/Common.h" +#include + +namespace ppc::gateway +{ +#define PROTOCOL_LOG(LEVEL) BCOS_LOG(LEVEL) << "[PROTOCOL]" +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp b/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp index 6c2a0fa0..a316a773 100644 --- a/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp +++ b/cpp/wedpr-protocol/protocol/src/PPCMessage.cpp @@ -24,6 +24,7 @@ using namespace bcos; using namespace ppc::front; +using namespace ppc::protocol; void PPCMessage::encode(bytes& _buffer) { @@ -137,4 +138,61 @@ std::map PPCMessage::decodeMap(const std::string& _enc } return maps; +} + +// Note: this interface is used after the MessagePayload(frontMessage) has been decoded; this +// interface passed some meta information to the ppcMessage +PPCMessageFace::Ptr PPCMessageFactory::decodePPCMessage(Message::Ptr msg) +{ + auto ppcMsg = buildPPCMessage(); + auto frontMsg = msg->frontMessage(); + // Note: this field is been setted when onReceiveMessage + if (frontMsg) + { + ppcMsg->setSeq(frontMsg->seq()); + ppcMsg->setUuid(frontMsg->traceID()); + if (frontMsg->isRespPacket()) + { + ppcMsg->setResponse(); + } + ppcMsg->decode(bcos::ref(frontMsg->data())); + } + if (msg->header() && msg->header()->optionalField()) + { + auto const& routeInfo = msg->header()->optionalField(); + ppcMsg->setTaskID(routeInfo->topic()); + ppcMsg->setSender(routeInfo->srcInst()); + } + return ppcMsg; +} + +Message::Ptr PPCMessageFactory::buildMessage(MessageBuilder::Ptr const& msgBuilder, + MessagePayloadBuilder::Ptr const& msgPayloadBuilder, PPCMessageFace::Ptr const& ppcMessage) +{ + auto msg = msgBuilder->build(); + msg->header()->optionalField()->setTopic(ppcMessage->taskID()); + msg->header()->optionalField()->setSrcInst(ppcMessage->sender()); + + auto payload = buildMessage(msgPayloadBuilder, ppcMessage); + auto payloadData = std::make_shared(); + payload->encode(*payloadData); + msg->setPayload(std::move(payloadData)); + return msg; +} + +MessagePayload::Ptr PPCMessageFactory::buildMessage( + MessagePayloadBuilder::Ptr const& msgPayloadBuilder, PPCMessageFace::Ptr const& ppcMessage) +{ + auto payload = msgPayloadBuilder->build(); + payload->setSeq(ppcMessage->seq()); + if (ppcMessage->response()) + { + payload->setRespPacket(); + } + payload->setTraceID(ppcMessage->uuid()); + + bcos::bytes ppcMsgData; + ppcMessage->encode(ppcMsgData); + payload->setData(std::move(ppcMsgData)); + return payload; } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/src/PPCMessage.h b/cpp/wedpr-protocol/protocol/src/PPCMessage.h index fc6983f6..148e4fd3 100644 --- a/cpp/wedpr-protocol/protocol/src/PPCMessage.h +++ b/cpp/wedpr-protocol/protocol/src/PPCMessage.h @@ -148,27 +148,15 @@ class PPCMessageFactory : public PPCMessageFaceFactory return msg; } - PPCMessageFace::Ptr buildPPCMessage(ppc::protocol::Message::Ptr msg) override - { - auto ppcMsg = buildPPCMessage(); - auto frontMsg = msg->frontMessage(); - if (frontMsg) - { - ppcMsg->setSeq(frontMsg->seq()); - ppcMsg->setUuid(frontMsg->traceID()); - if (frontMsg->isRespPacket()) - { - ppcMsg->setResponse(); - } - } - if (msg->header() && msg->header()->optionalField()) - { - auto const& routeInfo = msg->header()->optionalField(); - ppcMsg->setTaskID(routeInfo->topic()); - ppcMsg->setSender(routeInfo->srcInst()); - } - return ppcMsg; - } + PPCMessageFace::Ptr decodePPCMessage(ppc::protocol::Message::Ptr msg) override; + + ppc::protocol::Message::Ptr buildMessage(ppc::protocol::MessageBuilder::Ptr const& msgBuilder, + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) override; + + ppc::protocol::MessagePayload::Ptr buildMessage( + ppc::protocol::MessagePayloadBuilder::Ptr const& msgPayloadBuilder, + PPCMessageFace::Ptr const& ppcMessage) override; }; } // namespace front diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp index 117f304c..cc03557d 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp @@ -50,6 +50,10 @@ void MessageOptionalHeaderImpl::encode(bcos::bytes& buffer) const uint16_t dstInstLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstInst.size()); buffer.insert(buffer.end(), (byte*)&dstInstLen, (byte*)&dstInstLen + 2); buffer.insert(buffer.end(), m_dstInst.begin(), m_dstInst.end()); + // the topic + uint16_t topicLen = boost::asio::detail::socket_ops::host_to_network_short(m_topic.size()); + buffer.insert(buffer.end(), (byte*)&topicLen, (byte*)&topicLen + 2); + buffer.insert(buffer.end(), m_topic.begin(), m_topic.end()); } @@ -66,15 +70,13 @@ int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t con // srcNode offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset); // source inst - bcos::bytes sourceInst; - offset = decodeNetworkBuffer(sourceInst, data.data(), data.size(), offset); - m_srcInst = std::string(sourceInst.begin(), sourceInst.end()); + offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset); // dstNode offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset); - // dstInst, TODO: optimize here - bcos::bytes dstInstData; - offset = decodeNetworkBuffer(dstInstData, data.data(), data.size(), offset); - m_dstInst = std::string(dstInstData.begin(), dstInstData.end()); + // dstInst + offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset); + // topic + offset = decodeNetworkBuffer(m_topic, data.data(), data.size(), offset); return offset; } @@ -96,19 +98,29 @@ void MessageHeaderImpl::encode(bcos::bytes& buffer) const // the traceID, 2+Bytes uint16_t traceIDLen = boost::asio::detail::socket_ops::host_to_network_short(m_traceID.size()); buffer.insert(buffer.end(), (byte*)&traceIDLen, (byte*)&traceIDLen + 2); - buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end()); + if (m_traceID.size() > 0) + { + buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end()); + } // srcGwNode, 2+Bytes uint16_t srcGwNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_srcGwNode.size()); buffer.insert(buffer.end(), (byte*)&srcGwNodeLen, (byte*)&srcGwNodeLen + 2); - buffer.insert(buffer.end(), m_srcGwNode.begin(), m_srcGwNode.end()); + if (m_srcGwNode.size() > 0) + { + buffer.insert(buffer.end(), m_srcGwNode.begin(), m_srcGwNode.end()); + } // dstGwNode, 2+Bytes uint16_t dstGwNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstGwNode.size()); buffer.insert(buffer.end(), (byte*)&dstGwNodeLen, (byte*)&dstGwNodeLen + 2); - buffer.insert(buffer.end(), m_dstGwNode.begin(), m_dstGwNode.end()); + if (m_dstGwNode.size() > 0) + { + buffer.insert(buffer.end(), m_dstGwNode.begin(), m_dstGwNode.end()); + } if (!hasOptionalField()) { + m_length = buffer.size(); return; } // encode the optionalField @@ -137,18 +149,11 @@ int64_t MessageHeaderImpl::decode(bcos::bytesConstRef data) m_ext = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); pointer += 2; // the traceID - bcos::bytes traceIDData; - auto offset = - decodeNetworkBuffer(traceIDData, data.data(), data.size(), (pointer - data.data())); - m_traceID = std::string(traceIDData.begin(), traceIDData.end()); + auto offset = decodeNetworkBuffer(m_traceID, data.data(), data.size(), (pointer - data.data())); // srcGwNode - bcos::bytes srcGWNodeData; - offset = decodeNetworkBuffer(srcGWNodeData, data.data(), data.size(), offset); - m_srcGwNode = std::string(srcGWNodeData.begin(), srcGWNodeData.end()); + offset = decodeNetworkBuffer(m_srcGwNode, data.data(), data.size(), offset); // dstGwNode - bcos::bytes dstGWNodeData; - offset = decodeNetworkBuffer(dstGWNodeData, data.data(), data.size(), offset); - m_dstGwNode = std::string(dstGWNodeData.begin(), dstGWNodeData.end()); + offset = decodeNetworkBuffer(m_dstGwNode, data.data(), data.size(), offset); // optionalField if (hasOptionalField()) { diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h index b48caec7..0212c20b 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h +++ b/cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h @@ -56,7 +56,11 @@ class MessageHeaderImpl : public MessageHeader public: using Ptr = std::shared_ptr; MessageHeaderImpl() { m_optionalField = std::make_shared(); } - MessageHeaderImpl(bcos::bytesConstRef data) { decode(data); } + MessageHeaderImpl(bcos::bytesConstRef data) + { + m_optionalField = std::make_shared(); + decode(data); + } ~MessageHeaderImpl() override {} void encode(bcos::bytes& buffer) const override; diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.cpp b/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.cpp index 09f4183a..361029e6 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.cpp @@ -19,6 +19,7 @@ */ #include "MessageImpl.h" +#include "../Common.h" using namespace bcos; using namespace ppc::protocol; @@ -26,21 +27,30 @@ using namespace ppc::protocol; bool MessageImpl::encode(bcos::bytes& _buffer) { // encode the header - bcos::bytes headerData; - m_header->encode(headerData); + m_header->encode(_buffer); // encode the payload - if (m_payload) + if (m_payload && m_payload->size() > 0) { - headerData.insert(headerData.end(), m_payload->begin(), m_payload->end()); + _buffer.insert(_buffer.end(), m_payload->begin(), m_payload->end()); } } bool MessageImpl::encode(bcos::boostssl::EncodedMsg& encodedMsg) { - // header - m_header->encode(encodedMsg.header); - // assign the payload back - encodedMsg.payload = m_payload; + try + { + // header + m_header->encode(encodedMsg.header); + // assign the payload back + encodedMsg.payload = m_payload; + return true; + } + catch (std::exception const& e) + { + PROTOCOL_LOG(WARNING) << LOG_DESC("encode message failed") + << LOG_KV("error", boost::diagnostic_information(e)); + return false; + } } int64_t MessageImpl::decode(bytesConstRef buffer) @@ -53,6 +63,11 @@ int64_t MessageImpl::decode(bytesConstRef buffer) } // decode the header m_header = m_headerBuilder->build(buffer); + // no payload case + if (buffer.size() <= m_header->length()) + { + return buffer.size(); + } // decode the payload if (!m_payload) { @@ -60,4 +75,5 @@ int64_t MessageImpl::decode(bytesConstRef buffer) } m_payload->clear(); m_payload->insert(m_payload->end(), buffer.data() + m_header->length(), buffer.end()); + return buffer.size(); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.h b/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.h index e261a7a2..8fb053fe 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.h +++ b/cpp/wedpr-protocol/protocol/src/v1/MessageImpl.h @@ -33,7 +33,9 @@ class MessageImpl : public Message using Ptr = std::shared_ptr; MessageImpl(MessageHeaderBuilder::Ptr headerBuilder, size_t maxMessageLen) : m_headerBuilder(std::move(headerBuilder)), m_maxMessageLen(maxMessageLen) - {} + { + m_header = m_headerBuilder->build(); + } MessageImpl( MessageHeaderBuilder::Ptr headerBuilder, size_t maxMessageLen, bcos::bytesConstRef buffer) : MessageImpl(headerBuilder, maxMessageLen) diff --git a/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp b/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp index 7f1f676a..7e2f37b8 100644 --- a/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp @@ -71,10 +71,8 @@ int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer) m_ext = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); pointer += 2; // the traceID - bcos::bytes traceID; auto offset = - decodeNetworkBuffer(traceID, buffer.data(), buffer.size(), (pointer - buffer.data())); - m_traceID = std::string(traceID.begin(), traceID.end()); + decodeNetworkBuffer(m_traceID, buffer.data(), buffer.size(), (pointer - buffer.data())); // data return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp b/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp new file mode 100644 index 00000000..53977bbe --- /dev/null +++ b/cpp/wedpr-protocol/protocol/tests/MessageTest.cpp @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageTest.cpp + * @author: shawnhe + * @date 2022-10-28 + */ + +#include "protocol/src/v1/MessageHeaderImpl.h" +#include "protocol/src/v1/MessageImpl.h" +#include +#include + +using namespace ppc; +using namespace ppc::protocol; +using namespace bcos::test; + +BOOST_FIXTURE_TEST_SUITE(MessageTest, TestPromptFixture) + +void checkMsg(Message::Ptr msg, Message::Ptr decodedMsg) +{ + if (msg->header()) + { + BOOST_CHECK(msg->header()->version() == decodedMsg->header()->version()); + BOOST_CHECK(msg->header()->traceID() == decodedMsg->header()->traceID()); + BOOST_CHECK(msg->header()->srcGwNode() == decodedMsg->header()->srcGwNode()); + BOOST_CHECK(msg->header()->dstGwNode() == decodedMsg->header()->dstGwNode()); + BOOST_CHECK(msg->header()->packetType() == decodedMsg->header()->packetType()); + BOOST_CHECK(msg->header()->ttl() == decodedMsg->header()->ttl()); + BOOST_CHECK(msg->header()->ext() == decodedMsg->header()->ext()); + } + auto routeInfo = msg->header()->optionalField(); + auto decodedRouteInfo = decodedMsg->header()->optionalField(); + if (routeInfo) + { + BOOST_CHECK(routeInfo->topic() == decodedRouteInfo->topic()); + BOOST_CHECK(routeInfo->componentType() == decodedRouteInfo->componentType()); + BOOST_CHECK(routeInfo->srcNode() == decodedRouteInfo->srcNode()); + BOOST_CHECK(routeInfo->srcInst() == decodedRouteInfo->srcInst()); + BOOST_CHECK(routeInfo->dstNode() == decodedRouteInfo->dstNode()); + BOOST_CHECK(routeInfo->dstInst() == decodedRouteInfo->dstInst()); + } + if (msg->payload()) + { + BOOST_CHECK(*(msg->payload()) == *(decodedMsg->payload())); + } +} + +Message::Ptr fakeMsg(MessageBuilder::Ptr msgBuilder, int version, std::string const& traceID, + std::string const& srcGwNode, std::string const& dstGwNode, int packetType, int ttl, int ext, + std::string topic, std::string const& componentType, bcos::bytes const& srcNode, + std::string const& srcInst, bcos::bytes const& dstNode, std::string const& dstInst, + std::shared_ptr payload) +{ + auto msg = msgBuilder->build(); + msg->header()->setVersion(version); + msg->header()->setTraceID(traceID); + msg->header()->setSrcGwNode(srcGwNode); + msg->header()->setDstGwNode(dstGwNode); + msg->header()->setPacketType(packetType); + msg->header()->setTTL(ttl); + msg->header()->setExt(ext); + msg->header()->optionalField()->setTopic(topic); + msg->header()->optionalField()->setComponentType(componentType); + msg->header()->optionalField()->setSrcNode(srcNode); + msg->header()->optionalField()->setSrcInst(srcInst); + msg->header()->optionalField()->setDstNode(dstNode); + msg->header()->optionalField()->setDstInst(dstInst); + msg->setPayload(payload); + return msg; +} + +void checkEncodeDecode(MessageBuilder::Ptr msgBuilder, Message::Ptr const& msg) +{ + // encode + bcos::bytes encodedData; + msg->encode(encodedData); + + // decode + auto decodedMsg = msgBuilder->build(bcos::ref(encodedData)); + checkMsg(msg, decodedMsg); +} + +BOOST_AUTO_TEST_CASE(testMessage) +{ + auto msgBuilder = + std::make_shared(std::make_shared()); + int version = 1000; + int packetType = 2344; + int ttl = 123; + int ext = 1000; + std::string traceID = ""; + std::string srcGwNode = ""; + std::string dstGwNode = ""; + std::string topic = ""; + std::string componentType = ""; + bcos::bytes srcNode; + bcos::bytes dstNode; + std::string srcInst; + std::string dstInst; + std::shared_ptr payload; + + auto msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, + topic, componentType, srcNode, srcInst, dstNode, dstInst, payload); + checkEncodeDecode(msgBuilder, msg); + // with payload + payload = std::make_shared(); + std::string payloadData = "payloadf@#$@#$sdfs234"; + *payload = bcos::bytes(payloadData.begin(), payloadData.end()); + msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, topic, + componentType, srcNode, srcInst, dstNode, dstInst, payload); + checkEncodeDecode(msgBuilder, msg); + + // with header router + traceID = "1233"; + srcGwNode = "srcGwNode"; + dstGwNode = "dstGwNode"; + msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, topic, + componentType, srcNode, srcInst, dstNode, dstInst, payload); + checkEncodeDecode(msgBuilder, msg); + + // with optional field + std::string srcNodeData = "sdwerwer"; + srcNode = bcos::bytes(srcNodeData.begin(), srcNodeData.end()); + std::string dstNodeData = "dstswerwer"; + dstNode = bcos::bytes(dstNodeData.begin(), dstNodeData.end()); + dstInst = "dstInst"; + srcInst = "srcInst"; + componentType = "compp,ad"; + topic = "topcisdf"; + + msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, topic, + componentType, srcNode, srcInst, dstNode, dstInst, payload); + msg->setPacketType((uint16_t)ppc::gateway::GatewayPacketType::P2PMessage); + checkEncodeDecode(msgBuilder, msg); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/tests/PPCMessageTest.cpp b/cpp/wedpr-protocol/protocol/tests/PPCMessageTest.cpp index 316abdcf..1a8fea15 100644 --- a/cpp/wedpr-protocol/protocol/tests/PPCMessageTest.cpp +++ b/cpp/wedpr-protocol/protocol/tests/PPCMessageTest.cpp @@ -18,8 +18,11 @@ * @date 2022-10-28 */ -#include "ppc-protocol/src/PPCMessage.h" +#include "protocol/src/PPCMessage.h" #include "ppc-framework/protocol/Protocol.h" +#include "protocol/src/v1/MessageHeaderImpl.h" +#include "protocol/src/v1/MessageImpl.h" +#include "protocol/src/v1/MessagePayloadImpl.h" #include #include #include @@ -28,17 +31,12 @@ using namespace ppc; using namespace ppc::front; using namespace bcos::test; +using namespace ppc::protocol; BOOST_FIXTURE_TEST_SUITE(PPCMessageTest, TestPromptFixture) -void testEncodeAndDecode(PPCMessageFace::Ptr _message) +void checkPPCMessage(PPCMessageFace::Ptr _message, PPCMessageFace::Ptr newMsg) { - auto payload = std::make_shared(); - _message->encode(*payload); - - auto messageFactory = std::make_shared(); - auto newMsg = messageFactory->buildPPCMessage(payload); - BOOST_CHECK(newMsg->version() == _message->version()); BOOST_CHECK(newMsg->taskType() == _message->taskType()); BOOST_CHECK(newMsg->algorithmType() == _message->algorithmType()); @@ -46,7 +44,6 @@ void testEncodeAndDecode(PPCMessageFace::Ptr _message) BOOST_CHECK(newMsg->seq() == _message->seq()); BOOST_CHECK(newMsg->taskID() == _message->taskID()); BOOST_CHECK(newMsg->sender() == _message->sender()); - BOOST_CHECK(newMsg->ext() == _message->ext()); BOOST_CHECK(newMsg->uuid() == _message->uuid()); BOOST_CHECK(newMsg->data()->size() == _message->data()->size()); auto newMsgHeader = newMsg->header(); @@ -58,24 +55,108 @@ void testEncodeAndDecode(PPCMessageFace::Ptr _message) BOOST_CHECK(messageHeader["x-http-request"] == "2222222"); } -BOOST_AUTO_TEST_CASE(test_ppcMesage) +void checkPayloadMsg(MessagePayload::Ptr payloadMsg, MessagePayload::Ptr decodedPayload) +{ + BOOST_CHECK(payloadMsg->version() == decodedPayload->version()); + BOOST_CHECK(payloadMsg->seq() == decodedPayload->seq()); + BOOST_CHECK(payloadMsg->traceID() == decodedPayload->traceID()); + BOOST_CHECK(payloadMsg->data() == decodedPayload->data()); + BOOST_CHECK(payloadMsg->ext() == decodedPayload->ext()); +} + +void checkMsg(Message::Ptr msg, Message::Ptr decodedMsg) +{ + if (msg->header()) + { + BOOST_CHECK(msg->header()->version() == decodedMsg->header()->version()); + BOOST_CHECK(msg->header()->traceID() == decodedMsg->header()->traceID()); + BOOST_CHECK(msg->header()->srcGwNode() == decodedMsg->header()->srcGwNode()); + BOOST_CHECK(msg->header()->dstGwNode() == decodedMsg->header()->dstGwNode()); + BOOST_CHECK(msg->header()->packetType() == decodedMsg->header()->packetType()); + BOOST_CHECK(msg->header()->ttl() == decodedMsg->header()->ttl()); + BOOST_CHECK(msg->header()->ext() == decodedMsg->header()->ext()); + } + auto routeInfo = msg->header()->optionalField(); + auto decodedRouteInfo = decodedMsg->header()->optionalField(); + if (routeInfo) + { + BOOST_CHECK(routeInfo->topic() == decodedRouteInfo->topic()); + BOOST_CHECK(routeInfo->componentType() == decodedRouteInfo->componentType()); + BOOST_CHECK(routeInfo->srcNode() == decodedRouteInfo->srcNode()); + BOOST_CHECK(routeInfo->srcInst() == decodedRouteInfo->srcInst()); + BOOST_CHECK(routeInfo->dstNode() == decodedRouteInfo->dstNode()); + BOOST_CHECK(routeInfo->dstInst() == decodedRouteInfo->dstInst()); + } +} + +PPCMessageFace::Ptr fakePPCMessage(PPCMessageFactory::Ptr messageFactory, int version, int taskType, + int algorithmType, int messageType, std::string const& taskID, int seq, std::string const& uuid, + std::string const& srcInst, std::shared_ptr data, + std::map const& header) { - auto messageFactory = std::make_shared(); auto message = messageFactory->buildPPCMessage(); - message->setVersion(1); - message->setTaskType(uint8_t(protocol::TaskType::PSI)); - message->setAlgorithmType(uint8_t(protocol::PSIAlgorithmType::CM_PSI_2PC)); - message->setMessageType(4); - message->setSeq(5); - message->setTaskID("12345678"); - message->setSender("1001"); - message->setExt(10); - message->setUuid("uuid1245"); - message->setData(std::make_shared(10, 'a')); + message->setVersion(version); + message->setTaskType(taskType); + message->setAlgorithmType(algorithmType); + message->setMessageType(messageType); + message->setTaskID(taskID); + message->setSeq(seq); + message->setUuid(uuid); + message->setSender(srcInst); + message->setData(data); + message->setHeader(header); + return message; +} + + +BOOST_AUTO_TEST_CASE(test_ppcMesage) +{ + int version = 1; + int taskType = uint8_t(protocol::TaskType::PSI); + int algorithmType = uint8_t(protocol::PSIAlgorithmType::CM_PSI_2PC); + int messageType = 4; + std::string taskID = "12345678"; + int seq = 5; + std::string uuid = "uuid1245"; + std::string srcInst = "from"; + std::string dstInst = "dst"; + auto data = std::make_shared(10, 'a'); std::map head = { {"x-http-session", "111111"}, {"x-http-request", "2222222"}}; - message->setHeader(head); - testEncodeAndDecode(message); + + auto messageFactory = std::make_shared(); + auto ppcMessage = fakePPCMessage(messageFactory, version, taskType, algorithmType, messageType, + taskID, seq, uuid, srcInst, data, head); + + auto payloadBuilder = std::make_shared(); + auto msgBuilder = + std::make_shared(std::make_shared()); + + auto msg = messageFactory->buildMessage(msgBuilder, payloadBuilder, ppcMessage); + auto payloadMsg = messageFactory->buildMessage(payloadBuilder, ppcMessage); + bcos::bytes payloadData; + payloadMsg->encode(payloadData); + + auto decodedPayload = payloadBuilder->build(bcos::ref(payloadData)); + checkPayloadMsg(payloadMsg, decodedPayload); + + auto decodedPayload2 = payloadBuilder->build(bcos::ref(*(msg->payload()))); + checkPayloadMsg(payloadMsg, decodedPayload); + + bcos::bytes msgData; + msg->encode(msgData); + + auto decodedMsg = msgBuilder->build(bcos::ref(msgData)); + checkMsg(msg, decodedMsg); + + decodedMsg->setFrontMessage(decodedPayload2); + auto decodedPPCMsg = messageFactory->decodePPCMessage(decodedMsg); + checkPPCMessage(ppcMessage, decodedPPCMsg); + + // invalid case + std::string invalidStr = "sdfsinvalidsfwre"; + bcos::bytes invalidData(invalidStr.begin(), invalidStr.end()); + BOOST_CHECK_THROW(msgBuilder->build(bcos::ref(invalidData)), std::exception); } BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/cpp/wedpr-protocol/protocol/tests/TestTaskImpl.cpp b/cpp/wedpr-protocol/protocol/tests/TestTaskImpl.cpp index 33fc5a4c..dbe5c8ce 100644 --- a/cpp/wedpr-protocol/protocol/tests/TestTaskImpl.cpp +++ b/cpp/wedpr-protocol/protocol/tests/TestTaskImpl.cpp @@ -18,7 +18,7 @@ * @date 2022-10-19 */ -#include "ppc-protocol/src/JsonTaskImpl.h" +#include "protocol/src/JsonTaskImpl.h" #include "test-utils/TaskMock.h" #include #include diff --git a/cpp/wedpr-protocol/tars/CMakeLists.txt b/cpp/wedpr-protocol/tars/CMakeLists.txt index 7d0e2f8e..7ae5a697 100644 --- a/cpp/wedpr-protocol/tars/CMakeLists.txt +++ b/cpp/wedpr-protocol/tars/CMakeLists.txt @@ -39,10 +39,3 @@ target_include_directories(${TARS_PROTOCOL_TARGET} PUBLIC $ $) target_link_libraries(${TARS_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} tarscpp::tarsutil) - -# ut -if (TESTS) - enable_testing() - set(CTEST_OUTPUT_ON_FAILURE TRUE) - add_subdirectory(test) -endif () diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 8888b1fa..01a1f179 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -42,7 +42,8 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace bcos::bytes data; _message->encode(data); auto self = weak_from_this(); - m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_AGENCY, routeInfo, std::move(data), + // ROUTE_THROUGH_TOPIC will hold the topic + m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data), _message->seq(), _timeout, _callback, [self, _agencyID, _respCallback]( Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) { @@ -66,7 +67,7 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace } // get the agencyID _respCallback(error, msg->header()->optionalField()->srcInst(), - front->m_messageFactory->buildPPCMessage(msg), responseCallback); + front->m_messageFactory->decodePPCMessage(msg), responseCallback); }); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h index b3a10370..30cc315a 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h @@ -74,7 +74,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this _handler(nullptr); return; } - _handler(front->m_messageFactory->buildPPCMessage(msg)); + _handler(front->m_messageFactory->decodePPCMessage(msg)); }); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp index 467e4bf6..470462a2 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp @@ -134,8 +134,8 @@ void FrontImpl::asyncSendMessage(RouteType routeType, MessageOptionalHeader::Ptr m_callbackManager->addCallback(traceID, timeout, callback); auto self = weak_from_this(); // send the message to the gateway - asyncSendMessageToGateway(false, std::move(frontMessage), routeType, routeInfo, timeout, - [self, traceID, routeInfo, errorCallback](bcos::Error::Ptr error) { + asyncSendMessageToGateway(false, std::move(frontMessage), routeType, traceID, routeInfo, + timeout, [self, traceID, routeInfo, errorCallback](bcos::Error::Ptr error) { auto front = self.lock(); if (!front) { @@ -181,7 +181,7 @@ void FrontImpl::handleCallback( // set the srcNodeID routerInfo->setSrcNode(message->header()->optionalField()->dstNode()); front->asyncSendMessageToGateway(true, std::move(frontMessage), - RouteType::ROUTE_THROUGH_NODEID, routerInfo, 0, + RouteType::ROUTE_THROUGH_NODEID, message->header()->traceID(), routerInfo, 0, [routerInfo](bcos::Error::Ptr error) { if (!error || error->errorCode() == 0) { @@ -196,8 +196,8 @@ void FrontImpl::handleCallback( } void FrontImpl::asyncSendMessageToGateway(bool responsePacket, MessagePayload::Ptr&& frontMessage, - RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, long timeout, - ReceiveMsgFunc callback) + RouteType routeType, std::string const& traceID, MessageOptionalHeader::Ptr const& routeInfo, + long timeout, ReceiveMsgFunc callback) { if (responsePacket) { @@ -206,7 +206,8 @@ void FrontImpl::asyncSendMessageToGateway(bool responsePacket, MessagePayload::P routeInfo->setSrcNode(m_nodeID); auto payload = std::make_shared(); frontMessage->encode(*payload); - m_gatewayClient->asyncSendMessage(routeType, routeInfo, std::move(*payload), timeout, callback); + m_gatewayClient->asyncSendMessage( + routeType, routeInfo, traceID, std::move(*payload), timeout, callback); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index d89529d6..32700b7b 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -158,8 +158,8 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ private: void asyncSendMessageToGateway(bool responsePacket, ppc::protocol::MessagePayload::Ptr&& frontMessage, ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, long timeout, - ppc::protocol::ReceiveMsgFunc callback); + std::string const& traceID, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, + long timeout, ppc::protocol::ReceiveMsgFunc callback); void handleCallback(bcos::Error::Ptr const& error, std::string const& traceID, ppc::protocol::Message::Ptr message); diff --git a/cpp/wedpr-transport/ppc-front/test/unittests/PPCChannelTest.cpp b/cpp/wedpr-transport/ppc-front/test/unittests/PPCChannelTest.cpp index d0d2d6cf..e5f1ad9e 100644 --- a/cpp/wedpr-transport/ppc-front/test/unittests/PPCChannelTest.cpp +++ b/cpp/wedpr-transport/ppc-front/test/unittests/PPCChannelTest.cpp @@ -17,6 +17,7 @@ * @author: shawnhe * @date 2022-10-29 */ +#if 0 #include "ppc-front/ppc-front/PPCChannel.h" #include "ppc-front/ppc-front/PPCChannelManager.h" @@ -42,7 +43,7 @@ BOOST_AUTO_TEST_CASE(test_ppcChannel) auto channelManager = std::make_shared(ioService, front); // Note: must start here, otherwise the ioservice will not work - front->start(); + //front->start(); // register message handler channelManager->registerMsgHandlerForChannel( @@ -112,7 +113,7 @@ BOOST_AUTO_TEST_CASE(test_ppcChannel) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - front->stop(); + //front->stop(); } - -BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file +BOOST_AUTO_TEST_SUITE_END() +#endif \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/Common.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/Common.h index c6dc211c..5a81cc8c 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/Common.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/Common.h @@ -31,6 +31,8 @@ namespace ppc::gateway { #define GATEWAY_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY]" +#define SERVICE_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVICE]" +#define SERVICE_ROUTER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVICE][ROUTER]" // HTTP HEADER DEFINE #define HEAD_TASK_ID "x-ptp-session-id" diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/GatewayFactory.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/GatewayFactory.cpp index 1349593c..a31a184d 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/GatewayFactory.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/GatewayFactory.cpp @@ -19,6 +19,7 @@ */ #include "GatewayFactory.h" #include "Common.h" +#include "bcos-boostssl/utility/NewTimer.h" #include "bcos-boostssl/websocket/WsInitializer.h" #include "ppc-gateway/p2p/Service.h" #include "ppc-gateway/p2p/router/RouterTableImpl.h" @@ -59,7 +60,8 @@ Service::Ptr GatewayFactory::buildService() const wsInitializer->setConfig(wsConfig); auto p2pService = std::make_shared(m_contextConfig->nodeID(), std::make_shared(), m_config->gatewayConfig().unreachableDistance, - "Service"); + "Gateway-Service"); + p2pService->setTimerFactory(std::make_shared()); p2pService->setNodeEndpoints(m_gatewayConfig->nodeIPEndpointSet()); wsInitializer->initWsService(p2pService); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 4abad7a4..49c28a70 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -84,13 +84,14 @@ void GatewayImpl::stop() } void GatewayImpl::asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, - MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) + MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload) { // dispatcher to all the local front routeInfo->setDstNode(bcos::bytes()); routeInfo->setSrcInst(m_agency); auto p2pMessage = m_msgBuilder->build(routeType, routeInfo, std::move(payload)); + p2pMessage->setSeq(traceID); p2pMessage->setPacketType((uint16_t)GatewayPacketType::BroadcastMessage); m_localRouter->dispatcherMessage(p2pMessage, nullptr); @@ -100,14 +101,16 @@ void GatewayImpl::asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, void GatewayImpl::asyncSendMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout, - ReceiveMsgFunc callback) + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, + bcos::bytes&& payload, long timeout, ReceiveMsgFunc callback) { routeInfo->setSrcInst(m_agency); // check the localRouter auto p2pMessage = m_msgBuilder->build(routeType, routeInfo, std::move(payload)); - + p2pMessage->setSeq(traceID); p2pMessage->setPacketType((uint16_t)GatewayPacketType::P2PMessage); + GATEWAY_LOG(INFO) << LOG_DESC("##### asyncSendMessage") + << LOG_KV("msg", printMessage(p2pMessage)); auto nodeList = m_localRouter->chooseReceiver(p2pMessage); // case send to the same agency if (!nodeList.empty()) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index 66d692bf..12a43d49 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -54,11 +54,12 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_thisp2pnodeid(); + return m_rawGatewayInfo->p2pnodeid(); } // the agency std::string const& GatewayNodeInfoImpl::agency() const { - return m_inner()->agency(); + return m_rawGatewayInfo->agency(); } uint32_t GatewayNodeInfoImpl::statusSeq() const { - return m_inner()->statusseq(); + return m_rawGatewayInfo->statusseq(); } void GatewayNodeInfoImpl::setStatusSeq(uint32_t statusSeq) { - m_inner()->set_statusseq(statusSeq); + m_rawGatewayInfo->set_statusseq(statusSeq); } // get the node information by nodeID @@ -58,18 +58,35 @@ INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const return nullptr; } +void GatewayNodeInfoImpl::updateNodeList() +{ + // Note: can't use clear_nodelist here, for clear_nodelist will destroy the allocated nodelist, + // and cause double release coredump + releaseWithoutDestory(); + // re-encode nodeList + for (auto const& it : m_nodeList) + { + auto nodeInfo = std::dynamic_pointer_cast(it.second); + m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaAddAllocated( + nodeInfo->rawNodeInfo().get()); + } +} + bool GatewayNodeInfoImpl::tryAddNodeInfo(INodeInfo::Ptr const& info) { auto nodeID = info->nodeID().toBytes(); auto existedNodeInfo = nodeInfo(nodeID); - // update the info - if (existedNodeInfo == nullptr || !existedNodeInfo->equal(info)) + // the node info has not been updated + if (existedNodeInfo != nullptr && existedNodeInfo->equal(info)) + { + return false; + } { bcos::WriteGuard l(x_nodeList); m_nodeList[nodeID] = info; - return true; + updateNodeList(); } - return false; + return true; } void GatewayNodeInfoImpl::removeNodeInfo(bcos::bytes const& nodeID) @@ -84,6 +101,7 @@ void GatewayNodeInfoImpl::removeNodeInfo(bcos::bytes const& nodeID) } bcos::UpgradeGuard ul(l); m_nodeList.erase(it); + updateNodeList(); } // remove the topic info { @@ -184,30 +202,21 @@ void GatewayNodeInfoImpl::unRegisterTopic(bcos::bytes const& nodeID, std::string void GatewayNodeInfoImpl::encode(bcos::bytes& data) const { - m_inner()->clear_nodelist(); - { - bcos::ReadGuard l(x_nodeList); - // encode nodeList - for (auto const& it : m_nodeList) - { - auto nodeInfo = std::dynamic_pointer_cast(it.second); - m_inner()->mutable_nodelist()->UnsafeArenaAddAllocated(nodeInfo->innerFunc()()); - } - } - encodePBObject(data, m_inner()); + encodePBObject(data, m_rawGatewayInfo); } void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data) { - decodePBObject(m_inner(), data); + decodePBObject(m_rawGatewayInfo, data); { bcos::WriteGuard l(x_nodeList); // decode into m_nodeList m_nodeList.clear(); - for (int i = 0; i < m_inner()->nodelist_size(); i++) + for (int i = 0; i < m_rawGatewayInfo->nodelist_size(); i++) { - auto nodeInfoPtr = std::make_shared( - [m_entry = m_inner()->nodelist(i)]() mutable { return &m_entry; }); + std::shared_ptr rawNodeInfo( + m_rawGatewayInfo->mutable_nodelist(i)); + auto nodeInfoPtr = std::make_shared(rawNodeInfo); m_nodeList.insert(std::make_pair(nodeInfoPtr->nodeID().toBytes(), nodeInfoPtr)); } } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index 4e3ba366..92a74f17 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -29,21 +29,18 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo { public: using Ptr = std::shared_ptr; + GatewayNodeInfoImpl() : m_rawGatewayInfo(std::make_shared()) {} GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency) - : m_inner([inner = ppc::proto::GatewayNodeInfo()]() mutable { return &inner; }) + : GatewayNodeInfoImpl() { - m_inner()->set_p2pnodeid(p2pNodeID); - m_inner()->set_agency(agency); - } - ~GatewayNodeInfoImpl() override - { - auto allocatedNodeListSize = m_inner()->nodelist_size(); - for (int i = 0; i < allocatedNodeListSize; i++) - { - m_inner()->mutable_nodelist()->UnsafeArenaReleaseLast(); - } + m_rawGatewayInfo->set_p2pnodeid(p2pNodeID); + m_rawGatewayInfo->set_agency(agency); } + GatewayNodeInfoImpl(bcos::bytesConstRef data) : GatewayNodeInfoImpl() { decode(data); } + + ~GatewayNodeInfoImpl() override { releaseWithoutDestory(); } + // the gateway nodeID std::string const& p2pNodeID() const override; // the agency @@ -71,21 +68,38 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo std::map nodeList() const override { - bcos::WriteGuard l(x_nodeList); + bcos::ReadGuard l(x_nodeList); return m_nodeList; } uint32_t statusSeq() const override; void setStatusSeq(uint32_t statusSeq) override; - virtual uint16_t nodeSize() const override { return m_nodeList.size(); } + virtual uint16_t nodeSize() const override + { + bcos::ReadGuard l(x_nodeList); + return m_nodeList.size(); + } + +private: + void updateNodeList(); + + void releaseWithoutDestory() + { + // return back the ownership to nodeList to shared_ptr + auto allocatedNodeListSize = m_rawGatewayInfo->nodelist_size(); + for (int i = 0; i < allocatedNodeListSize; i++) + { + m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaReleaseLast(); + } + } private: - std::function m_inner; + std::shared_ptr m_rawGatewayInfo; // NodeID => nodeInfo std::map m_nodeList; mutable bcos::SharedMutex x_nodeList; - // NodeID=>topics + // NodeID=>topics(Note serialized) using Topics = std::set; std::map m_topicInfo; mutable bcos::SharedMutex x_topicInfo; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp index a26844e4..6106e6bc 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp @@ -93,7 +93,8 @@ void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSessi return; } // status changed, request for the nodeStatus - GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") << LOG_KV("from", from) + GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") + << LOG_KV("from", printP2PIDElegantly(from)) << LOG_KV("statusSeq", statusSeq); m_service->asyncSendMessageByP2PNodeID( (uint16_t)GatewayPacketType::RequestNodeStatus, from, std::make_shared()); @@ -138,11 +139,11 @@ void GatewayRouterManager::onReceiveRequestNodeStatusMsg( if (!nodeStatusData) { GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveRequestNodeStatusMsg: generate nodeInfo error") - << LOG_KV("from", from); + << LOG_KV("from", printP2PIDElegantly(from)); return; } GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveRequestNodeStatusMsg: response the latest nodeStatus") - << LOG_KV("from", from); + << LOG_KV("from", printP2PIDElegantly(from)); m_service->asyncSendMessageByP2PNodeID( (uint16_t)GatewayPacketType::ResponseNodeStatus, from, nodeStatusData); } @@ -157,7 +158,8 @@ void GatewayRouterManager::onRecvResponseNodeStatusMsg(MessageFace::Ptr msg, WsS p2pMessage->header()->srcGwNode() : session->nodeId(); - GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") << LOG_KV("from", from) + GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") + << LOG_KV("from", printP2PIDElegantly(from)) << LOG_KV("statusSeq", nodeStatus->statusSeq()) << LOG_KV("agency", nodeStatus->agency()); updatePeerNodeStatus(from, nodeStatus); @@ -176,7 +178,8 @@ void GatewayRouterManager::updatePeerNodeStatus( UpgradeGuard ul(l); m_p2pID2Seq[p2pID] = statusSeq; } - GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") << LOG_KV("from", p2pID) + GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") + << LOG_KV("from", printP2PIDElegantly(p2pID)) << LOG_KV("statusSeq", status->statusSeq()) << LOG_KV("agency", status->agency()); m_peerRouter->updateGatewayInfo(status); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index 2db66e76..55b3ea67 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -20,14 +20,27 @@ #include "LocalRouter.h" #include "ppc-framework/Common.h" #include "ppc-framework/gateway/GatewayProtocol.h" +#include "ppc-gateway/Common.h" using namespace bcos; using namespace ppc::protocol; using namespace ppc::gateway; +bool LocalRouter::registerNodeInfo(ppc::protocol::INodeInfo::Ptr nodeInfo) +{ + GATEWAY_LOG(INFO) << LOG_DESC("registerNodeInfo") << printNodeInfo(nodeInfo); + nodeInfo->setFront(m_frontBuilder->buildClient(nodeInfo->endPoint())); + auto ret = m_routerInfo->tryAddNodeInfo(nodeInfo); + if (ret) + { + increaseSeq(); + } + return ret; +} // Note: the change of the topic will not trigger router-update void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& topic) { + GATEWAY_LOG(INFO) << LOG_DESC("registerTopic") << LOG_KV("topic", topic); m_routerInfo->registerTopic(_nodeID.toBytes(), topic); // try to dispatch the cacheInfo if (!m_cache) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h index 2bd18e7f..5cf5a7ef 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -30,26 +30,16 @@ class LocalRouter { public: using Ptr = std::shared_ptr; - LocalRouter(GatewayNodeInfoFactory::Ptr nodeInfoFactory, + LocalRouter(GatewayNodeInfoFactory::Ptr gatewayNodeInfoFactory, ppc::front::IFrontBuilder::Ptr frontBuilder, MessageCache::Ptr msgCache) - : m_routerInfo(std::move(nodeInfoFactory->build())), + : m_routerInfo(gatewayNodeInfoFactory->build()), m_frontBuilder(std::move(frontBuilder)), m_cache(std::move(msgCache)) {} virtual ~LocalRouter() = default; - virtual bool registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) - { - nodeInfo->setFront(m_frontBuilder->buildClient(nodeInfo->endPoint())); - auto ret = m_routerInfo->tryAddNodeInfo(nodeInfo); - if (ret) - { - increaseSeq(); - } - return ret; - } - + virtual bool registerNodeInfo(ppc::protocol::INodeInfo::Ptr nodeInfo); virtual void unRegisterNode(bcos::bytes const& nodeID) { m_routerInfo->removeNodeInfo(nodeID); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index fd36e94e..c60fa00d 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -40,7 +40,7 @@ Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _rou m_routerTable->setNodeID(m_nodeID); m_routerTable->setUnreachableDistance(unreachableDistance); - GATEWAY_LOG(INFO) << LOG_DESC("create P2PService") << LOG_KV("module", _moduleName); + SERVICE_LOG(INFO) << LOG_DESC("create P2PService") << LOG_KV("module", _moduleName); WsService::registerConnectHandler( boost::bind(&Service::onP2PConnect, this, boost::placeholders::_1)); WsService::registerDisconnectHandler( @@ -50,7 +50,8 @@ Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _rou void Service::onP2PConnect(WsSession::Ptr _session) { - GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect") << LOG_KV("p2pid", _session->nodeId()) + SERVICE_LOG(INFO) << LOG_DESC("Receive new p2p connection") + << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); @@ -59,8 +60,8 @@ void Service::onP2PConnect(WsSession::Ptr _session) // the session already connected if (it != m_nodeID2Session.end() && it->second->isConnected()) { - GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect, drop the duplicated connection") - << LOG_KV("nodeID", _session->nodeId()) + SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect, drop the duplicated connection") + << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); _session->drop(WsError::UserDisconnect); updateNodeIDInfo(_session); @@ -70,28 +71,54 @@ void Service::onP2PConnect(WsSession::Ptr _session) if (_session->nodeId() == m_nodeID) { updateNodeIDInfo(_session); - GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect, drop the node-self connection") - << LOG_KV("nodeID", _session->nodeId()) + SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect, drop the node-self connection") + << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); _session->drop(WsError::UserDisconnect); return; } - // the new session - updateNodeIDInfo(_session); + + + ///// Note: here allow all new session, even the ip not configured(support dynamic access) + bool updated = updateNodeIDInfo(_session); + // hit the m_nodeID2Session if (it != m_nodeID2Session.end()) { + // the old session has already been connected, and the new session endPoint is not + // configured + if (it->second->isConnected() && !updated) + { + SERVICE_LOG(INFO) << LOG_DESC( + "onP2PConnect, drop the new not-configurated session, remain " + "the old session") + << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) + << LOG_KV("endPoint", _session->endPoint()) + << LOG_KV("oldEndPoint", it->second->endPoint()); + _session->drop(WsError::UserDisconnect); + return; + } + SERVICE_LOG(INFO) << LOG_DESC( + "onP2PConnect, drop the old not-configurated session, replace " + "with the new session") + << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) + << LOG_KV("endPoint", _session->endPoint()) + << LOG_KV("oldEndPoint", it->second->endPoint()); + if (it->second->isConnected()) + { + it->second->drop(WsError::UserDisconnect); + } it->second = _session; + return; } - else - { - m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); - } - GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect established") << LOG_KV("p2pid", _session->nodeId()) + // the new session + m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); + SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect established new session") + << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); } -void Service::updateNodeIDInfo(WsSession::Ptr const& _session) +bool Service::updateNodeIDInfo(WsSession::Ptr const& _session) { bcos::WriteGuard l(x_configuredNode2ID); std::string p2pNodeID = _session->nodeId(); @@ -99,16 +126,16 @@ void Service::updateNodeIDInfo(WsSession::Ptr const& _session) if (it != m_configuredNode2ID.end()) { it->second = p2pNodeID; - GATEWAY_LOG(INFO) << LOG_DESC("updateNodeIDInfo: update the nodeID") - << LOG_KV("nodeid", p2pNodeID) - << LOG_KV("endpoint", _session->endPoint()); - } - else - { - GATEWAY_LOG(INFO) << LOG_DESC("updateNodeIDInfo can't find endpoint") + SERVICE_LOG(INFO) << LOG_DESC("updateNodeIDInfo: update the nodeID") << LOG_KV("nodeid", p2pNodeID) << LOG_KV("endpoint", _session->endPoint()); + return true; } + + SERVICE_LOG(INFO) << LOG_DESC("updateNodeIDInfo can't find endpoint") + << LOG_KV("nodeid", printP2PIDElegantly(p2pNodeID)) + << LOG_KV("endpoint", _session->endPoint()); + return false; } void Service::removeSessionInfo(WsSession::Ptr const& _session) @@ -117,8 +144,8 @@ void Service::removeSessionInfo(WsSession::Ptr const& _session) auto it = m_nodeID2Session.find(_session->nodeId()); if (it != m_nodeID2Session.end()) { - GATEWAY_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session" - << LOG_KV("p2pid", _session->nodeId()) + SERVICE_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session" + << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); m_nodeID2Session.erase(it); @@ -132,7 +159,8 @@ void Service::onP2PDisconnect(WsSession::Ptr _session) UpgradableGuard l(x_configuredNode2ID); for (auto& it : m_configuredNode2ID) { - if (it.second == _session->nodeId()) + // reset the nodeID of the dropped session(except the node-self) to empty + if (m_nodeID != _session->nodeId() && it.second == _session->nodeId()) { UpgradeGuard ul(l); it.second.clear(); @@ -158,6 +186,9 @@ void Service::reconnect() continue; } unconnectedPeers->insert(it.first); + SERVICE_LOG(DEBUG) << LOG_DESC("ready to reconnect") + << LOG_KV("endpoint", + it.first.address() + ":" + std::to_string(it.first.port())); } } setReconnectedPeers(unconnectedPeers); @@ -201,7 +232,7 @@ void Service::asyncSendMessageWithForward( return asyncSendMessage(dstNodeID, msg, options, respFunc); } // with nextHop, send the message to nextHop - GATEWAY_LOG(TRACE) << LOG_DESC("asyncSendMessageByNodeID") << printMessage(p2pMsg); + SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessageByNodeID") << printMessage(p2pMsg); return asyncSendMessage(nextHop, msg, options, respFunc); } @@ -216,6 +247,10 @@ void Service::asyncSendMessage( { return; } + if (msg->seq().empty()) + { + msg->setSeq(m_messageFactory->newSeq()); + } auto session = getSessionByNodeID(dstNodeID); if (session) { @@ -231,13 +266,13 @@ void Service::asyncSendMessage( " failed for no network established, msg: " + printWsMessage(msg)); respFunc(std::move(error), nullptr, nullptr); } - GATEWAY_LOG(WARNING) + SERVICE_LOG(WARNING) << LOG_DESC("asyncSendMessageByNodeID failed for no network established, msg detail:") << printWsMessage(msg); } catch (std::exception const& e) { - GATEWAY_LOG(ERROR) << "asyncSendMessageByNodeID" << LOG_KV("dstNode", dstNodeID) + SERVICE_LOG(ERROR) << "asyncSendMessageByNodeID" << LOG_KV("dstNode", dstNodeID) << LOG_KV("what", boost::diagnostic_information(e)); if (respFunc) { @@ -254,7 +289,7 @@ void Service::onRecvMessage(MessageFace::Ptr _msg, std::shared_ptr _s // find the dstNode if (p2pMsg->header()->dstGwNode().empty() || p2pMsg->header()->dstGwNode() == m_nodeID) { - GATEWAY_LOG(TRACE) << LOG_DESC("onRecvMessage, dispatch for find the dst node") + SERVICE_LOG(TRACE) << LOG_DESC("onRecvMessage, dispatch for find the dst node") << printMessage(p2pMsg); WsService::onRecvMessage(_msg, _session); return; @@ -262,7 +297,7 @@ void Service::onRecvMessage(MessageFace::Ptr _msg, std::shared_ptr _s // forward the message if (p2pMsg->header()->ttl() >= m_routerTable->unreachableDistance()) { - GATEWAY_LOG(WARNING) << LOG_DESC("onRecvMessage: ttl expired") << printMessage(p2pMsg); + SERVICE_LOG(WARNING) << LOG_DESC("onRecvMessage: ttl expired") << printMessage(p2pMsg); return; } p2pMsg->header()->setTTL(p2pMsg->header()->ttl() + 1); @@ -283,7 +318,7 @@ void Service::asyncBroadcastMessage(bcos::boostssl::MessageFace::Ptr msg, Option } catch (std::exception& e) { - GATEWAY_LOG(WARNING) << LOG_BADGE("asyncBroadcastMessage exception") + SERVICE_LOG(WARNING) << LOG_BADGE("asyncBroadcastMessage exception") << LOG_KV("msg", printWsMessage(msg)) << LOG_KV("error", boost::diagnostic_information(e)); } @@ -318,6 +353,6 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& WsSessions sessions; sessions.emplace_back(session); WsService::asyncSendMessage(sessions, respMessage); - GATEWAY_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage)) + SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage)) << LOG_KV("payload size", payload->size()); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h index 3be13906..c41e0c1f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h @@ -79,7 +79,7 @@ class Service : public bcos::boostssl::ws::WsService void reconnect() override; - void updateNodeIDInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); + bool updateNodeIDInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); void removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); bcos::boostssl::ws::WsSession::Ptr getSessionByNodeID(std::string const& _nodeID); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp index 00f3e7c7..77a765fb 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp @@ -70,9 +70,10 @@ void RouterManager::onReceiveRouterSeq(MessageFace::Ptr msg, WsSession::Ptr sess { return; } - GATEWAY_LOG(INFO) << LOG_BADGE("onReceiveRouterSeq") - << LOG_DESC("receive router seq and request router table") - << LOG_KV("peer", session->nodeId()) << LOG_KV("seq", statusSeq); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("onReceiveRouterSeq") + << LOG_DESC("receive router seq and request router table") + << LOG_KV("peer", printP2PIDElegantly(session->nodeId())) + << LOG_KV("seq", statusSeq); // request router table to peer auto p2pMsg = std::dynamic_pointer_cast(msg); auto dstP2PNodeID = (!p2pMsg->header()->srcGwNode().empty()) ? p2pMsg->header()->srcGwNode() : @@ -99,17 +100,19 @@ void RouterManager::onReceivePeersRouterTable(MessageFace::Ptr msg, WsSession::P { auto routerTable = m_service->routerTableFactory()->createRouterTable(ref(*(msg->payload()))); - GATEWAY_LOG(INFO) << LOG_BADGE("onReceivePeersRouterTable") << LOG_KV("peer", session->nodeId()) - << LOG_KV("entrySize", routerTable->routerEntries().size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("onReceivePeersRouterTable") + << LOG_KV("peer", printP2PIDElegantly(session->nodeId())) + << LOG_KV("entrySize", routerTable->routerEntries().size()); joinRouterTable(session->nodeId(), routerTable); } // receive routerTable request from peer void RouterManager::onReceiveRouterTableRequest(MessageFace::Ptr msg, WsSession::Ptr session) { - GATEWAY_LOG(INFO) << LOG_BADGE("onReceiveRouterTableRequest") - << LOG_KV("peer", session->nodeId()) - << LOG_KV("entrySize", m_service->routerTable()->routerEntries().size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("onReceiveRouterTableRequest") + << LOG_KV("peer", printP2PIDElegantly(session->nodeId())) + << LOG_KV( + "entrySize", m_service->routerTable()->routerEntries().size()); auto routerTableData = std::make_shared(); m_service->routerTable()->encode(*routerTableData); @@ -135,8 +138,8 @@ void RouterManager::joinRouterTable( } } - GATEWAY_LOG(INFO) << LOG_BADGE("joinRouterTable") << LOG_DESC("create router entry") - << LOG_KV("dst", _generatedFrom); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("joinRouterTable") << LOG_DESC("create router entry") + << LOG_KV("dst", printP2PIDElegantly(_generatedFrom)); auto entry = m_service->routerTableFactory()->createRouterEntry(); entry->setDstNode(_generatedFrom); @@ -147,8 +150,9 @@ void RouterManager::joinRouterTable( } if (!updated) { - GATEWAY_LOG(DEBUG) << LOG_BADGE("joinRouterTable") << LOG_DESC("router table not updated") - << LOG_KV("dst", _generatedFrom); + SERVICE_ROUTER_LOG(DEBUG) << LOG_BADGE("joinRouterTable") + << LOG_DESC("router table not updated") + << LOG_KV("dst", printP2PIDElegantly(_generatedFrom)); return; } onP2PNodesUnreachable(unreachableNodes); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp index f8a2a22a..f3cc3071 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp @@ -69,9 +69,10 @@ bool RouterTable::erase(std::set& _unreachableNodes, std::string co it->second->clearNextHop(); _unreachableNodes.insert(it->second->dstNode()); - GATEWAY_LOG(INFO) << LOG_BADGE("erase") << LOG_DESC("make the router unreachable") - << LOG_KV("dst", _p2pNodeID) << LOG_KV("distance", it->second->distance()) - << LOG_KV("size", m_routerEntries.size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("erase") << LOG_DESC("make the router unreachable") + << LOG_KV("dst", printP2PIDElegantly(_p2pNodeID)) + << LOG_KV("distance", it->second->distance()) + << LOG_KV("size", m_routerEntries.size()); updated = true; } // update the router-entry with nextHop equal to _p2pNodeID to be unreachable @@ -94,13 +95,13 @@ void RouterTable::updateDistanceForAllRouterEntries( entry->clearNextHop(); _unreachableNodes.insert(entry->dstNode()); } - GATEWAY_LOG(INFO) << LOG_BADGE("updateDistanceForAllRouterEntries") - << LOG_DESC( - "update entry since the nextHop distance has been updated") - << LOG_KV("dst", entry->dstNode()) << LOG_KV("nextHop", _nextHop) - << LOG_KV("distance", entry->distance()) - << LOG_KV("oldDistance", oldDistance) - << LOG_KV("size", m_routerEntries.size()); + SERVICE_ROUTER_LOG(INFO) + << LOG_BADGE("updateDistanceForAllRouterEntries") + << LOG_DESC("update entry since the nextHop distance has been updated") + << LOG_KV("dst", printP2PIDElegantly(entry->dstNode())) + << LOG_KV("nextHop", printP2PIDElegantly(_nextHop)) + << LOG_KV("distance", entry->distance()) << LOG_KV("oldDistance", oldDistance) + << LOG_KV("size", m_routerEntries.size()); } } } @@ -111,10 +112,10 @@ bool RouterTable::update(std::set& _unreachableNodes, if (c_fileLogLevel <= TRACE) [[unlikely]] { - GATEWAY_LOG(TRACE) << LOG_BADGE("update") << LOG_DESC("receive entry") - << LOG_KV("dst", printP2PIDElegantly(_entry->dstNode())) - << LOG_KV("distance", _entry->distance()) - << LOG_KV("from", _generatedFrom); + SERVICE_ROUTER_LOG(TRACE) << LOG_BADGE("update") << LOG_DESC("receive entry") + << LOG_KV("dst", printP2PIDElegantly(_entry->dstNode())) + << LOG_KV("distance", _entry->distance()) + << LOG_KV("from", printP2PIDElegantly(_generatedFrom)); } auto ret = updateDstNodeEntry(_generatedFrom, _entry); // the dst entry has not been updated @@ -168,12 +169,12 @@ bool RouterTable::updateDstNodeEntry( _entry->setNextHop(_generatedFrom); } m_routerEntries.insert(std::make_pair(_entry->dstNode(), _entry)); - GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") - << LOG_DESC("insert new entry into the routerTable") - << LOG_KV("distance", _entry->distance()) - << LOG_KV("dst", _entry->dstNode()) - << LOG_KV("nextHop", _entry->nextHop()) - << LOG_KV("size", m_routerEntries.size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC("insert new entry into the routerTable") + << LOG_KV("distance", _entry->distance()) + << LOG_KV("dst", printP2PIDElegantly(_entry->dstNode())) + << LOG_KV("nextHop", printP2PIDElegantly(_entry->nextHop())) + << LOG_KV("size", m_routerEntries.size()); return true; } @@ -189,13 +190,13 @@ bool RouterTable::updateDstNodeEntry( currentEntry->setNextHop(_generatedFrom); } currentEntry->setDistance(distance); - GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") - << LOG_DESC("discover smaller distance, update entry") - << LOG_KV("distance", currentEntry->distance()) - << LOG_KV("oldDistance", currentDistance) - << LOG_KV("dst", _entry->dstNode()) - << LOG_KV("nextHop", _entry->nextHop()) - << LOG_KV("size", m_routerEntries.size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC("discover smaller distance, update entry") + << LOG_KV("distance", currentEntry->distance()) + << LOG_KV("oldDistance", currentDistance) + << LOG_KV("dst", printP2PIDElegantly(_entry->dstNode())) + << LOG_KV("nextHop", printP2PIDElegantly(_entry->nextHop())) + << LOG_KV("size", m_routerEntries.size()); return true; } // the distance information for the nextHop changed @@ -216,14 +217,14 @@ bool RouterTable::updateDstNodeEntry( { currentEntry->clearNextHop(); } - GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") - << LOG_DESC( - "distance of the nextHop entry " - "updated, update the current entry") - << LOG_KV("dst", currentEntry->dstNode()) - << LOG_KV("nextHop", currentEntry->nextHop()) - << LOG_KV("distance", currentEntry->distance()) - << LOG_KV("size", m_routerEntries.size()); + SERVICE_ROUTER_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC( + "distance of the nextHop entry " + "updated, update the current entry") + << LOG_KV("dst", printP2PIDElegantly(currentEntry->dstNode())) + << LOG_KV("nextHop", printP2PIDElegantly(currentEntry->nextHop())) + << LOG_KV("distance", currentEntry->distance()) + << LOG_KV("size", m_routerEntries.size()); return true; } return false; @@ -264,9 +265,9 @@ std::set RouterTable::getAllReachableNode() std::stringstream nodes; std::for_each(reachableNodes.begin(), reachableNodes.end(), [&](const auto& item) { nodes << printP2PIDElegantly(item) << ","; }); - GATEWAY_LOG(TRACE) << LOG_BADGE("getAllReachableNode") - << LOG_KV("nodes size", reachableNodes.size()) - << LOG_KV("nodes", nodes.str()); + SERVICE_ROUTER_LOG(TRACE) + << LOG_BADGE("getAllReachableNode") << LOG_KV("nodes size", reachableNodes.size()) + << LOG_KV("nodes", nodes.str()); } return reachableNodes; diff --git a/cpp/wedpr-transport/ppc-gateway/test/unittests/GatewayNodeInfoImplTest.cpp b/cpp/wedpr-transport/ppc-gateway/test/unittests/GatewayNodeInfoImplTest.cpp new file mode 100644 index 00000000..0a2f80e4 --- /dev/null +++ b/cpp/wedpr-transport/ppc-gateway/test/unittests/GatewayNodeInfoImplTest.cpp @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayNodeInfoImplTest.cpp + * @author: yujiechen + * @date 2024-09-06 + */ +#include "ppc-gateway/gateway/router/GatewayNodeInfoImpl.h" +#include "ppc-gateway/gateway/router/GatewayNodeInfo.h" +#include "protobuf/src/NodeInfoImpl.h" +#include +#include +#include + +using namespace ppc; +using namespace ppc::protocol; +using namespace ppc::gateway; +using namespace bcos::test; + +BOOST_FIXTURE_TEST_SUITE(NodeInfoTest, TestPromptFixture) + +INodeInfo::Ptr fakeNodeInfo(INodeInfoFactory::Ptr factory, std::string const& nodeID, + std::string const& endPoint, std::set const& components) +{ + auto nodeInfo = + factory->build(bcos::bytesConstRef((bcos::byte*)nodeID.data(), nodeID.size()), endPoint); + // nodeInfo->setNodeID(bcos::bytesConstRef((bcos::byte*)nodeID.data(), nodeID.size())); + // nodeInfo->setEndPoint(endPoint); + nodeInfo->setComponents(components); + return nodeInfo; +} + +void checkNodeInfo(INodeInfo::Ptr nodeInfo, INodeInfo::Ptr decodedNodeInfo) +{ + BOOST_CHECK(nodeInfo->nodeID().toBytes() == decodedNodeInfo->nodeID().toBytes()); + BOOST_CHECK(nodeInfo->endPoint() == decodedNodeInfo->endPoint()); + auto const& components = nodeInfo->components(); + for (auto const& decodedComp : decodedNodeInfo->components()) + { + BOOST_CHECK(components.count(decodedComp)); + } +} + +void testNodeInfoEncodeDecode(INodeInfoFactory::Ptr factory, INodeInfo::Ptr nodeInfo) +{ + bcos::bytes encodedData; + nodeInfo->encode(encodedData); + + auto decodedNodeInfo = factory->build(bcos::ref(encodedData)); + checkNodeInfo(nodeInfo, decodedNodeInfo); +} + +void registerNode(GatewayNodeInfoImpl::Ptr gatewayNodeInfo, int nodeSize) +{ + auto nodeInfoFactory = std::make_shared(); + std::string nodeID = "testn+NodeID"; + std::string endPoint = "testEndpoint"; + for (int i = 0; i < nodeSize; i++) + { + std::set components; + for (int j = 0; j < 100; j++) + { + components.insert("component_" + std::to_string(i) + "_" + std::to_string(j)); + } + auto populatedNodeID = nodeID + std::to_string(i); + auto nodeInfo = fakeNodeInfo(nodeInfoFactory, populatedNodeID, endPoint, components); + testNodeInfoEncodeDecode(nodeInfoFactory, nodeInfo); + gatewayNodeInfo->tryAddNodeInfo(nodeInfo); + } +} + +GatewayNodeInfo::Ptr fakeGatewayNodeInfo(GatewayNodeInfoFactory::Ptr factory, uint32_t statusSeq) +{ + auto gatewayNodeInfo = factory->build(); + gatewayNodeInfo->setStatusSeq(statusSeq); + return gatewayNodeInfo; +} + +void checkGatewayNodeInfo(GatewayNodeInfoImpl::Ptr gatewayNodeInfo, int expectedNodeSize, + int expectedStatusSeq, std::string const& expectedAgency, std::string const& expectedP2pNode) +{ + BOOST_CHECK(gatewayNodeInfo->nodeSize() == expectedNodeSize); + BOOST_CHECK(gatewayNodeInfo->statusSeq() == expectedStatusSeq); + BOOST_CHECK(gatewayNodeInfo->agency() == expectedAgency); + BOOST_CHECK(gatewayNodeInfo->p2pNodeID() == expectedP2pNode); + + // check concurrency + tbb::parallel_for(tbb::blocked_range(0U, 5), [&](auto const& range) { + bcos::bytes dataTemp; + gatewayNodeInfo->encode(dataTemp); + }); + bcos::bytes data; + gatewayNodeInfo->encode(data); + + auto decodedNodeInfo = std::make_shared(bcos::ref(data)); + BOOST_CHECK(gatewayNodeInfo->nodeSize() == decodedNodeInfo->nodeSize()); + BOOST_CHECK(gatewayNodeInfo->statusSeq() == decodedNodeInfo->statusSeq()); + BOOST_CHECK(gatewayNodeInfo->agency() == decodedNodeInfo->agency()); + BOOST_CHECK(gatewayNodeInfo->p2pNodeID() == decodedNodeInfo->p2pNodeID()); + + auto nodeList = decodedNodeInfo->nodeList(); + auto originNodeList = gatewayNodeInfo->nodeList(); + for (auto const& it : nodeList) + { + auto decodedNodeInfo = it.second; + auto originNodeInfo = originNodeList.at(it.first); + checkNodeInfo(originNodeInfo, decodedNodeInfo); + } +} + +BOOST_AUTO_TEST_CASE(testGatewayNodeInfo) +{ + std::string agency = "agency"; + std::string p2pNode = "p2p23"; + int statusSeq = 123343234234; + auto factory = std::make_shared(p2pNode, agency); + auto gatewayNodeInfo = + std::dynamic_pointer_cast(fakeGatewayNodeInfo(factory, statusSeq)); + registerNode(gatewayNodeInfo, 10); + checkGatewayNodeInfo(gatewayNodeInfo, 10, statusSeq, agency, p2pNode); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/test/unittests/MockCache.h b/cpp/wedpr-transport/ppc-gateway/test/unittests/MockCache.h index a974faed..73c7e0f0 100644 --- a/cpp/wedpr-transport/ppc-gateway/test/unittests/MockCache.h +++ b/cpp/wedpr-transport/ppc-gateway/test/unittests/MockCache.h @@ -21,66 +21,65 @@ #include "ppc-framework/storage/CacheStorage.h" #include -namespace ppc::mock { -class MockCache : public storage::CacheStorage { +namespace ppc::mock +{ +class MockCache : public storage::CacheStorage +{ public: - using Ptr = std::shared_ptr; - MockCache() = default; - ~MockCache() override {} + using Ptr = std::shared_ptr; + MockCache() = default; + ~MockCache() override {} - /// Note: all these interfaces throws exception when error happened - /** - * @brief: check whether the key exists - * @param _key: key - * @return whether the key exists - */ - bool exists(const std::string &_key) override { - return m_kv.find(_key) != m_kv.end(); - } + /// Note: all these interfaces throws exception when error happened + /** + * @brief: check whether the key exists + * @param _key: key + * @return whether the key exists + */ + bool exists(const std::string& _key) override { return m_kv.find(_key) != m_kv.end(); } - /** - * @brief: set key value - * @param _expirationTime: timeout of key, seconds - */ - void setValue(const std::string &_key, const std::string &_value, - int32_t _expirationSeconds = -1) override { - m_kv.emplace(_key, _value); - } - - /** - * @brief: get value by key - * @param _key: key - * @return value - */ - std::optional getValue(const std::string &_key) override { - auto it = m_kv.find(_key); - if (it == m_kv.end()) { - return std::nullopt; + /** + * @brief: set key value + * @param _expirationTime: timeout of key, seconds + */ + void setValue(const std::string& _key, const std::string& _value, + int32_t _expirationSeconds = -1) override + { + m_kv.emplace(_key, _value); } - return it->second; - } + /** + * @brief: get value by key + * @param _key: key + * @return value + */ + std::optional getValue(const std::string& _key) override + { + auto it = m_kv.find(_key); + if (it == m_kv.end()) + { + return std::nullopt; + } + + return it->second; + } - /** - * @brief: set a timeout on key - * @param _expirationTime: timeout of key, ms - * @return whether setting is successful - */ - bool expireKey(const std::string &_key, uint32_t _expirationTime) override { - return true; - } + /** + * @brief: set a timeout on key + * @param _expirationTime: timeout of key, ms + * @return whether setting is successful + */ + bool expireKey(const std::string& _key, uint32_t _expirationTime) override { return true; } - /** - * @brief: delete key - * @param _key: key - * @return the number of key deleted - */ - uint64_t deleteKey(const std::string &_key) override { - return m_kv.erase(_key); - } + /** + * @brief: delete key + * @param _key: key + * @return the number of key deleted + */ + uint64_t deleteKey(const std::string& _key) override { return m_kv.erase(_key); } private: - std::unordered_map> m_kv; + std::unordered_map> m_kv; }; -} // namespace ppc::mock \ No newline at end of file +} // namespace ppc::mock \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp index d665afe1..a1db0739 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp @@ -31,7 +31,7 @@ Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config) { auto wsConfig = initConfig(_config); // create the wsConfig - auto wsService = std::make_shared(); + auto wsService = std::make_shared("WeDPR-RPC"); wsService->setTimerFactory(std::make_shared()); auto initializer = std::make_shared(); diff --git a/cpp/wedpr-transport/sdk/TransportBuilder.cpp b/cpp/wedpr-transport/sdk/TransportBuilder.cpp index 6550efb6..cec1a6aa 100644 --- a/cpp/wedpr-transport/sdk/TransportBuilder.cpp +++ b/cpp/wedpr-transport/sdk/TransportBuilder.cpp @@ -23,7 +23,7 @@ #include "Transport.h" #include "TransportImpl.h" #include "ppc-front/FrontConfigImpl.h" -#include "protobuf/NodeInfoImpl.h" +#include "protobuf/src/NodeInfoImpl.h" #include using namespace ppc::sdk; diff --git a/cpp/wedpr-transport/sdk/TransportImpl.h b/cpp/wedpr-transport/sdk/TransportImpl.h index c32e20d3..a400b416 100644 --- a/cpp/wedpr-transport/sdk/TransportImpl.h +++ b/cpp/wedpr-transport/sdk/TransportImpl.h @@ -21,7 +21,7 @@ #include "Transport.h" #include "ppc-framework/gateway/IGateway.h" #include "ppc-front/FrontFactory.h" -#include "protobuf/NodeInfoImpl.h" +#include "protobuf/src/NodeInfoImpl.h" #include "protocol/src/v1/MessageHeaderImpl.h" #include "protocol/src/v1/MessagePayloadImpl.h"