From 3c0b9fbbe87e87fc049a5631f6664242f57338dc Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 9 Sep 2024 12:56:03 +0800 Subject: [PATCH 1/4] add asyncGetPeers and asyncGetAgencies --- cpp/ppc-framework/Common.h | 1 + cpp/ppc-framework/front/FrontInterface.h | 4 ++ cpp/ppc-framework/front/IFront.h | 2 + cpp/ppc-framework/gateway/IGateway.h | 4 ++ cpp/ppc-framework/protocol/INodeInfo.h | 3 + cpp/test-utils/FakeFront.h | 19 ++++++ cpp/wedpr-computing/ppc-psi/src/PSIConfig.h | 18 +----- .../tests/ra2018-psi/TestEcdhPSIImpl.cpp | 7 ++- cpp/wedpr-helper/ppc-utilities/Utilities.h | 13 ++++- cpp/wedpr-initializer/Initializer.cpp | 8 +-- .../grpc/client/GatewayClient.cpp | 27 +++++++++ .../grpc/client/GatewayClient.h | 4 ++ .../grpc/server/GatewayServer.cpp | 53 +++++++++++++++++ .../grpc/server/GatewayServer.h | 6 ++ cpp/wedpr-protocol/proto/pb/Service.proto | 16 +++++ .../protobuf/src/CMakeLists.txt | 2 +- .../protobuf/src/NodeInfoImpl.cpp | 13 +++++ .../protobuf/src/NodeInfoImpl.h | 2 + .../ppc-front/ppc-front/Front.cpp | 58 +++++++++++++++++++ .../ppc-front/ppc-front/Front.h | 22 ++++++- .../ppc-front/ppc-front/FrontImpl.h | 6 ++ .../ppc-gateway/gateway/GatewayImpl.cpp | 54 +++++++++++++++++ .../ppc-gateway/gateway/GatewayImpl.h | 4 ++ .../gateway/router/GatewayNodeInfo.h | 2 + .../gateway/router/GatewayNodeInfoImpl.cpp | 16 +++++ .../gateway/router/GatewayNodeInfoImpl.h | 2 + .../gateway/router/PeerRouterTable.cpp | 11 ++++ .../gateway/router/PeerRouterTable.h | 8 +++ .../ppc-gateway/ppc-gateway/p2p/Service.cpp | 1 - 29 files changed, 359 insertions(+), 27 deletions(-) diff --git a/cpp/ppc-framework/Common.h b/cpp/ppc-framework/Common.h index 077a1969..ec4bfb94 100644 --- a/cpp/ppc-framework/Common.h +++ b/cpp/ppc-framework/Common.h @@ -77,6 +77,7 @@ inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC); } + template inline std::string printNodeID(T const& nodeID) { diff --git a/cpp/ppc-framework/front/FrontInterface.h b/cpp/ppc-framework/front/FrontInterface.h index ff019e22..16691928 100644 --- a/cpp/ppc-framework/front/FrontInterface.h +++ b/cpp/ppc-framework/front/FrontInterface.h @@ -73,6 +73,10 @@ class FrontInterface virtual std::string const& selfEndPoint() const { return m_selfEndPoint; } + virtual std::vector agencies() const = 0; + virtual void start() = 0; + virtual void stop() = 0; + protected: // the selfEndPoint for the air-mode-node can be localhost std::string m_selfEndPoint = "localhost"; diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index 561723dd..f3a9b647 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -99,6 +99,8 @@ class IFront : virtual public IFrontClient virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0; virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0; + virtual void asyncGetAgencies( + std::function)> callback) = 0; /** * @brief register the nodeInfo to the gateway diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h index e4422b4a..d2489217 100644 --- a/cpp/ppc-framework/gateway/IGateway.h +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -65,6 +65,10 @@ class IGateway ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload) = 0; + virtual void asyncGetPeers(std::function callback) = 0; + virtual void asyncGetAgencies( + std::function)> callback) = 0; + virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0; virtual bcos::Error::Ptr registerTopic( diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index c8789773..1c843a27 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -20,6 +20,7 @@ #pragma once #include "ppc-framework/Common.h" #include +#include #include #include #include @@ -58,6 +59,8 @@ class INodeInfo { return (nodeID() == info->nodeID()) && (components() == info->components()); } + + virtual void toJson(Json::Value& jsonObject) const = 0; }; class INodeInfoFactory { diff --git a/cpp/test-utils/FakeFront.h b/cpp/test-utils/FakeFront.h index 18e7741b..44b249ef 100644 --- a/cpp/test-utils/FakeFront.h +++ b/cpp/test-utils/FakeFront.h @@ -198,6 +198,21 @@ class FakeFront : public FrontInterface } } + // for ut + void setAgencyList(std::vector const& agencyList) + { + bcos::WriteGuard l(x_agencyList); + m_agencyList = agencyList; + } + + std::vector agencies() const override + { + bcos::ReadGuard l(x_agencyList); + return m_agencyList; + } + + void start() override {} + void stop() override {} private: // the uuid to _callback @@ -231,5 +246,9 @@ class FakeFront : public FrontInterface std::map m_uuidToCallback; bcos::Mutex m_mutex; std::atomic m_uuid = 0; + + // the agency list, for task-sync + std::vector m_agencyList; + mutable bcos::SharedMutex x_agencyList; }; } // namespace ppc::test \ No newline at end of file diff --git a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h index 72ee8bbe..898c172d 100644 --- a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h +++ b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h @@ -114,18 +114,7 @@ class PSIConfig int taskExpireTime() const { return m_taskExpireTime; } void setTaskExpireTime(int _taskExpireTime) { m_taskExpireTime = _taskExpireTime; } - std::vector agencyList() const - { - bcos::ReadGuard l(x_agencyList); - return m_agencyList; - } - - // for ut - void setAgencyList(std::vector const& agencyList) - { - bcos::WriteGuard l(x_agencyList); - m_agencyList = agencyList; - } + std::vector agencyList() const { return m_front->agencies(); } protected: ppc::front::PPCMessageFace::Ptr generatePPCMsg( @@ -160,10 +149,5 @@ class PSIConfig // the task-expire time int m_taskExpireTime = 10000; - - // the agency list, for task-sync - // TODO: fetch from the gateway - std::vector m_agencyList; - mutable bcos::SharedMutex x_agencyList; }; } // namespace ppc::psi \ No newline at end of file diff --git a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp index a7df31a2..5c973bed 100644 --- a/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp +++ b/cpp/wedpr-computing/ppc-psi/tests/ra2018-psi/TestEcdhPSIImpl.cpp @@ -61,8 +61,11 @@ void testEcdhImplFunc(int64_t _dataBatchSize, std::string const& _serverPSIDataS auto clientPSI = factory->createEcdhPSI(clientAgencyName, clientConfig); std::vector agencyList = {serverAgencyName, clientAgencyName}; - serverPSI->psiConfig()->setAgencyList(agencyList); - clientPSI->psiConfig()->setAgencyList(agencyList); + auto serverFront = std::dynamic_pointer_cast(serverPSI->psiConfig()->front()); + serverFront->setAgencyList(agencyList); + + auto clientFront = std::dynamic_pointer_cast(clientPSI->psiConfig()->front()); + clientFront->setAgencyList(agencyList); // register the server-psi into the front factory->front()->registerEcdhPSI(serverAgencyName, serverPSI); diff --git a/cpp/wedpr-helper/ppc-utilities/Utilities.h b/cpp/wedpr-helper/ppc-utilities/Utilities.h index 7600e09d..7b989243 100644 --- a/cpp/wedpr-helper/ppc-utilities/Utilities.h +++ b/cpp/wedpr-helper/ppc-utilities/Utilities.h @@ -18,12 +18,13 @@ * @date 2024-08-23 */ #pragma once - #include "ppc-framework/Common.h" #include #include #include #include +#include +#include namespace ppc { @@ -52,4 +53,14 @@ inline std::string generateUUID() static thread_local auto uuid_gen = boost::uuids::basic_random_generator(); return boost::uuids::to_string(uuid_gen()); } +template +inline std::string printVector(std::vector const& list) +{ + std::stringstream oss; + for (auto const& it : list) + { + oss << it << ","; + } + return oss.str(); +} } // namespace ppc \ No newline at end of file diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index b5848406..e25a63b4 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -413,9 +413,9 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc) void Initializer::start() { - if (m_transport) + if (m_ppcFront) { - m_transport->start(); + m_ppcFront->start(); } /*if (m_ecdhConnPSI) { @@ -456,9 +456,9 @@ void Initializer::start() void Initializer::stop() { // stop the network firstly - if (m_transport) + if (m_ppcFront) { - m_transport->stop(); + m_ppcFront->stop(); } /*if (m_ecdhConnPSI) { diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index 92580244..7b55c1a3 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -39,6 +39,33 @@ void GatewayClient::asyncSendMessage(RouteType routeType, [callback, response](Status status) { callback(toError(status, std::move(*response))); }); } +void GatewayClient::asyncGetPeers(std::function callback) +{ + auto response = std::make_shared(); + ClientContext context; + auto request = std::make_shared(); + m_stub->async()->asyncGetPeers( + &context, request.get(), response.get(), [callback, response](Status status) { + callback(toError(status, std::move(*response->mutable_error())), response->peersinfo()); + }); +} + +void GatewayClient::asyncGetAgencies( + std::function)> callback) +{ + auto response = std::make_shared(); + ClientContext context; + auto request = std::make_shared(); + m_stub->async()->asyncGetAgencies( + &context, request.get(), response.get(), [callback, response](Status status) { + std::vector agencies; + for (int i = 0; i < response->agencies_size(); i++) + { + agencies.emplace_back(response->agencies(i)); + } + callback(toError(status, std::move(*response->mutable_error())), agencies); + }); +} bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) { diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index 69513248..58c79cec 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -55,6 +55,10 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override; + void asyncGetPeers(std::function callback) override; + void asyncGetAgencies( + std::function)> callback) override; + void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, bcos::bytes&& payload) override diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp index 3b5caf69..8b770a06 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp @@ -51,6 +51,59 @@ ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* conte return reactor.get(); } +grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers( + grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::PeersInfo* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + m_gateway->asyncGetPeers([reactor, reply](bcos::Error::Ptr error, std::string peersInfo) { + toSerializedError(reply->mutable_error(), error); + reply->set_peersinfo(std::move(peersInfo)); + reactor->Finish(Status::OK); + }); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncGetPeers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply->mutable_error(), + std::make_shared( + -1, "asyncGetPeers failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + +grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies( + grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::AgenciesInfo* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + m_gateway->asyncGetAgencies( + [reactor, reply](bcos::Error::Ptr error, std::vector agencies) { + toSerializedError(reply->mutable_error(), error); + for (auto const& it : agencies) + { + reply->add_agencies(it); + } + reactor->Finish(Status::OK); + }); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncGetAgencies exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply->mutable_error(), + std::make_shared(-1, + "asyncGetAgencies failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + + ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* context, const ppc::proto::NodeInfo* serializedNodeInfo, ppc::proto::Error* reply) { diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.h b/cpp/wedpr-protocol/grpc/server/GatewayServer.h index d287a6c8..7c62048d 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.h +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.h @@ -39,6 +39,12 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService grpc::ServerUnaryReactor* asyncSendMessage(grpc::CallbackServerContext* context, const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) override; + grpc::ServerUnaryReactor* asyncGetPeers(grpc::CallbackServerContext* context, + const ppc::proto::Empty* request, ppc::proto::PeersInfo* reply) override; + grpc::ServerUnaryReactor* asyncGetAgencies(grpc::CallbackServerContext* context, + const ppc::proto::Empty* request, ppc::proto::AgenciesInfo* reply) override; + + grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override; diff --git a/cpp/wedpr-protocol/proto/pb/Service.proto b/cpp/wedpr-protocol/proto/pb/Service.proto index e46d91b0..f531f08f 100644 --- a/cpp/wedpr-protocol/proto/pb/Service.proto +++ b/cpp/wedpr-protocol/proto/pb/Service.proto @@ -30,11 +30,27 @@ message SendedMessageRequest{ int64 timeout = 4; string traceID = 5; }; + +message AgenciesInfo{ + Error error = 1; + repeated string agencies = 2; +}; + +message PeersInfo{ + Error error = 1; + string peersInfo = 2; +}; +message Empty{ + +}; + service Front { rpc onReceiveMessage (ReceivedMessage) returns (Error) {} } service Gateway{ rpc asyncSendMessage(SendedMessageRequest) returns(Error){} + rpc asyncGetPeers(Empty)returns(PeersInfo){} + rpc asyncGetAgencies(Empty)returns(AgenciesInfo){} rpc registerNodeInfo(NodeInfo) returns(Error){} rpc unRegisterNodeInfo(NodeInfo)returns(Error){} rpc registerTopic(NodeInfo) returns(Error){} diff --git a/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt b/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt index 7aaa0e0d..8b23ec29 100644 --- a/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt +++ b/cpp/wedpr-protocol/protobuf/src/CMakeLists.txt @@ -24,4 +24,4 @@ endforeach() file(GLOB_RECURSE SRCS *.cpp) add_library(${PB_PROTOCOL_TARGET} ${SRCS} ${MESSAGES_SRCS}) -target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} protobuf::libprotobuf ${CPU_FEATURES_LIB}) \ No newline at end of file +target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} jsoncpp_static protobuf::libprotobuf ${CPU_FEATURES_LIB}) \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp index 93f0f66a..59cf149e 100644 --- a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp @@ -37,4 +37,17 @@ void NodeInfoImpl::decode(bcos::bytesConstRef data) decodePBObject(m_rawNodeInfo, data); m_components = std::set( m_rawNodeInfo->components().begin(), m_rawNodeInfo->components().end()); +} + +void NodeInfoImpl::toJson(Json::Value& jsonObject) const +{ + jsonObject["nodeID"] = std::string(nodeID().begin(), nodeID().end()); + jsonObject["endPoint"] = endPoint(); + Json::Value componentsInfo(Json::arrayValue); + auto componentsList = components(); + for (auto const& it : componentsList) + { + componentsInfo.append(it); + } + jsonObject["components"] = componentsInfo; } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h index ed8f6a68..02f8b31d 100644 --- a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h @@ -76,6 +76,8 @@ class NodeInfoImpl : public INodeInfo } std::shared_ptr const& getFront() const override { return m_front; } + void toJson(Json::Value& jsonObject) const override; + private: std::shared_ptr m_front; std::set m_components; diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 01a1f179..6b4e0577 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -19,11 +19,69 @@ */ #include "Front.h" #include "FrontImpl.h" +#include "ppc-utilities/Utilities.h" using namespace ppc; using namespace bcos; using namespace ppc::protocol; using namespace ppc::front; + +Front::Front(IFront::Ptr front) : m_front(std::move(front)) +{ + m_fetcher = std::make_shared(60 * 1000, "metaFetcher"); + m_fetcher->registerTimeoutHandler([this]() { + try + { + fetchGatewayMetaInfo(); + } + catch (std::exception const& e) + { + FRONT_LOG(WARNING) << LOG_DESC("fetch the gateway information failed") + << LOG_KV("error", boost::diagnostic_information(e)); + } + }); +} + +void Front::start() +{ + m_front->start(); + m_fetcher->start(); +} + +void Front::stop() +{ + m_fetcher->stop(); + m_front->stop(); +} + +void Front::fetchGatewayMetaInfo() +{ + auto self = weak_from_this(); + m_front->asyncGetAgencies([self](bcos::Error::Ptr error, std::vector agencies) { + auto front = self.lock(); + if (!front) + { + return; + } + if (error && error->errorCode() != 0) + { + FRONT_LOG(WARNING) << LOG_DESC("asyncGetAgencies failed") + << LOG_KV("code", error->errorCode()) + << LOG_KV("msg", error->errorMessage()); + return; + } + bcos::UpgradableGuard l(front->x_agencyList); + if (front->m_agencyList == agencies) + { + return; + } + front->m_agencyList = agencies; + FRONT_LOG(INFO) << LOG_DESC("Update agencies information") + << LOG_KV("agencies", printVector(agencies)); + }); + m_fetcher->restart(); +} + /** * @brief: send message to other party by gateway * @param _agencyID: agency ID of receiver diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h index 30cc315a..cdd5f8ab 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h @@ -19,6 +19,7 @@ */ #pragma once +#include "bcos-utilities/Timer.h" #include "ppc-framework/front/FrontInterface.h" #include "ppc-framework/front/IFront.h" #include "ppc-framework/protocol/PPCMessageFace.h" @@ -29,9 +30,12 @@ class Front : public FrontInterface, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - Front(IFront::Ptr front) : m_front(std::move(front)) {} + Front(IFront::Ptr front); ~Front() override {} + void start() override; + void stop() override; + /** * @brief: send message to other party by gateway * @param _agencyID: agency ID of receiver @@ -78,8 +82,24 @@ class Front : public FrontInterface, public std::enable_shared_from_this }); } + std::vector agencies() const override + { + bcos::ReadGuard l(x_agencyList); + return m_agencyList; + } + +protected: + void fetchGatewayMetaInfo(); + private: IFront::Ptr m_front; + + // the agency list + std::vector m_agencyList; + mutable bcos::SharedMutex x_agencyList; + + std::shared_ptr m_fetcher; + ppc::front::PPCMessageFaceFactory::Ptr m_messageFactory; }; } // namespace ppc::front \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index 32700b7b..552c5ee6 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -135,6 +135,12 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ m_gatewayClient->registerTopic(bcos::ref(m_nodeID), topic); } + void asyncGetAgencies( + std::function)> callback) override + { + m_gatewayClient->asyncGetAgencies(callback); + } + /** * @brief unRegister the topic * diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index d80079d0..18afb78d 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -239,4 +239,58 @@ bcos::Error::Ptr GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::s { m_localRouter->unRegisterTopic(nodeID, topic); return nullptr; +} + +void GatewayImpl::asyncGetPeers(std::function callback) +{ + if (!callback) + { + return; + } + try + { + auto infos = m_peerRouter->gatewayInfos(); + Json::Value peers; + peers["agency"] = m_agency; + peers["nodeID"] = m_service->nodeID(); + peers["peers"] = Json::Value(Json::arrayValue); + for (auto const& it : infos) + { + auto gatewayInfoList = it.second; + Json::Value agencyGatewayInfo; + agencyGatewayInfo["agency"] = it.first; + Json::Value peersInfo(Json::arrayValue); + for (auto const& gatewayInfo : gatewayInfoList) + { + Json::Value gatewayJson; + gatewayInfo->toJson(gatewayJson); + peersInfo.append(gatewayJson); + } + agencyGatewayInfo["gateway"] = peersInfo; + peers["peers"].append(agencyGatewayInfo); + } + Json::FastWriter fastWriter; + std::string statusStr = fastWriter.write(peers); + callback(nullptr, statusStr); + } + catch (std::exception const& e) + { + GATEWAY_LOG(WARNING) << LOG_DESC("asyncGetPeers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + callback( + std::make_shared( + -1, "asyncGetPeers exception for " + std::string(boost::diagnostic_information(e))), + ""); + } +} + +void GatewayImpl::asyncGetAgencies( + std::function)> callback) +{ + if (!callback) + { + return; + } + auto agencies = m_peerRouter->agencies(); + callback(nullptr, agencies); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index 12a43d49..4cc4af78 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -67,6 +67,10 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this callback) override; + void asyncGetAgencies( + std::function)> callback) override; + protected: virtual void onReceiveP2PMessage( bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h index be8d8eaf..b7e3e0f2 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -19,6 +19,7 @@ */ #pragma once #include "ppc-framework/protocol/INodeInfo.h" +#include "ppc-utilities/Utilities.h" #include #include #include @@ -58,6 +59,7 @@ class GatewayNodeInfo virtual std::map nodeList() const = 0; virtual uint16_t nodeSize() const = 0; + virtual void toJson(Json::Value& jsonObject) const = 0; }; class GatewayNodeInfoFactory diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index c6c24422..2f3bb34a 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -220,4 +220,20 @@ void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data) m_nodeList.insert(std::make_pair(nodeInfoPtr->nodeID().toBytes(), nodeInfoPtr)); } } +} + +void GatewayNodeInfoImpl::toJson(Json::Value& jsonObject) const +{ + jsonObject["gatewayNodeID"] = p2pNodeID(); + jsonObject["agency"] = agency(); + + auto agencyNodeList = nodeList(); + Json::Value frontList(Json::arrayValue); + for (auto const& it : agencyNodeList) + { + Json::Value nodeInfo; + it.second->toJson(nodeInfo); + frontList.append(nodeInfo); + } + jsonObject["frontList"] = frontList; } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index 92a74f17..13f0bf25 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -80,6 +80,8 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo return m_nodeList.size(); } + void toJson(Json::Value& jsonObject) const override; + private: void updateNodeList(); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index 3da0fbe9..7d7889c3 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -67,6 +67,17 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo); } +std::vector PeerRouterTable::agencies() const +{ + std::vector agencies; + bcos::ReadGuard l(x_mutex); + for (auto const& it : m_agency2GatewayInfos) + { + agencies.emplace_back(it.first); + } + return agencies; +} + GatewayNodeInfos PeerRouterTable::selectRouter( RouteType const& routeType, Message::Ptr const& msg) const { diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h index 7a433303..7a286443 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h @@ -40,6 +40,14 @@ class PeerRouterTable virtual void asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const; + std::vector agencies() const; + + std::map gatewayInfos() const + { + bcos::ReadGuard l(x_mutex); + return m_agency2GatewayInfos; + } + private: virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const; virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index c60fa00d..68ecea83 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -17,7 +17,6 @@ * @author: yujiechen * @date 2024-08-26 */ - #include "Service.h" #include "bcos-boostssl/websocket/WsError.h" #include "ppc-framework/Common.h" From b1660b7ecf5293c9fd959cfda542a65fc809f240 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 9 Sep 2024 15:42:03 +0800 Subject: [PATCH 2/4] add getPeers rpc implementation --- cpp/cmake/BuildInfo.cmake | 2 +- cpp/ppc-framework/rpc/RpcTypeDef.h | 1 + .../air-node/AirNodeInitializer.cpp | 2 +- cpp/wedpr-main/cem-node/CEMInitializer.cpp | 2 +- cpp/wedpr-main/common/NodeStarter.h | 4 +-- cpp/wedpr-main/mpc-node/MPCInitializer.cpp | 2 +- .../pro-node/ProNodeInitializer.cpp | 3 +- .../ppc-front/ppc-front/Front.cpp | 17 ++++++---- .../ppc-gateway/gateway/GatewayImpl.cpp | 5 +++ .../gateway/router/GatewayNodeInfoImpl.cpp | 1 + .../ppc-gateway/gateway/router/LocalRouter.h | 2 ++ cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp | 2 +- cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp | 34 +++++++++++++++++-- cpp/wedpr-transport/ppc-rpc/src/Rpc.h | 7 +++- .../ppc-rpc/src/RpcFactory.cpp | 5 +-- cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h | 5 +-- cpp/wedpr-transport/sdk/ProTransportImpl.h | 1 - cpp/wedpr-transport/sdk/Transport.h | 6 +++- cpp/wedpr-transport/sdk/TransportImpl.h | 1 + 19 files changed, 78 insertions(+), 24 deletions(-) diff --git a/cpp/cmake/BuildInfo.cmake b/cpp/cmake/BuildInfo.cmake index 628b7c23..7ef9778c 100644 --- a/cpp/cmake/BuildInfo.cmake +++ b/cpp/cmake/BuildInfo.cmake @@ -27,7 +27,7 @@ function(create_build_info) # Generate header file containing useful build information add_custom_target(BuildInfo.h ALL WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} - COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}" + COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}/.." -DPPC_BUILDINFO_IN="${CMAKE_CURRENT_SOURCE_DIR}/cmake/templates/BuildInfo.h.in" -DPPC_DST_DIR="${PROJECT_BINARY_DIR}/include" -DPPC_CMAKE_DIR="${CMAKE_CURRENT_SOURCE_DIR}/cmake" diff --git a/cpp/ppc-framework/rpc/RpcTypeDef.h b/cpp/ppc-framework/rpc/RpcTypeDef.h index 5a0fb493..208fee40 100644 --- a/cpp/ppc-framework/rpc/RpcTypeDef.h +++ b/cpp/ppc-framework/rpc/RpcTypeDef.h @@ -36,6 +36,7 @@ enum class RpcError : int32_t std::string const RUN_TASK_METHOD = "runTask"; std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask"; std::string const GET_TASK_STATUS = "getTaskStatus"; +std::string const GET_PEERS = "getPeers"; std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask"; std::string const FETCH_CIPHER = "fetchCipher"; diff --git a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp index ffcf605a..af88181d 100644 --- a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp +++ b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath) auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); - m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); + m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway); m_rpc->setRpcStorage(rpcStatusInterface); m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); m_nodeInitializer->registerRpcHandler(m_rpc); diff --git a/cpp/wedpr-main/cem-node/CEMInitializer.cpp b/cpp/wedpr-main/cem-node/CEMInitializer.cpp index 69c2efbe..e228fa5a 100644 --- a/cpp/wedpr-main/cem-node/CEMInitializer.cpp +++ b/cpp/wedpr-main/cem-node/CEMInitializer.cpp @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath) auto storageConfig = ppcConfig->storageConfig(); auto cemConfig = ppcConfig->cemConfig(); auto rpcFactory = std::make_shared(ppcConfig->agencyID()); - m_rpc = rpcFactory->buildRpc(ppcConfig); + m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr); auto cemService = std::make_shared(); cemService->setCEMConfig(cemConfig); cemService->setStorageConfig(storageConfig); diff --git a/cpp/wedpr-main/common/NodeStarter.h b/cpp/wedpr-main/common/NodeStarter.h index 95d8d671..e33ad7ef 100644 --- a/cpp/wedpr-main/common/NodeStarter.h +++ b/cpp/wedpr-main/common/NodeStarter.h @@ -68,13 +68,13 @@ int startProgram( } printVersion(); std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "The " + binaryName + "is running..." << std::endl; + std::cout << "The " + binaryName + " is running..." << std::endl; while (!exitHandler.shouldExit()) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } starter.reset(); std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "The" + binaryName + " program exit normally." << std::endl; + std::cout << "The " + binaryName + " program exit normally." << std::endl; } } // namespace ppc::node \ No newline at end of file diff --git a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp index de8b8f1f..6998876a 100644 --- a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp +++ b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath) auto storageConfig = ppcConfig->storageConfig(); auto mpcConfig = ppcConfig->mpcConfig(); auto rpcFactory = std::make_shared(ppcConfig->agencyID()); - m_rpc = rpcFactory->buildRpc(ppcConfig); + m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr); auto mpcService = std::make_shared(); mpcService->setMPCConfig(mpcConfig); mpcService->setStorageConfig(storageConfig); diff --git a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp index 2ced6616..ec0ce5cc 100644 --- a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp +++ b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath) auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); - m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); + m_rpc = rpcFactory->buildRpc( + m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway()); m_rpc->setRpcStorage(rpcStatusInterface); m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); m_nodeInitializer->registerRpcHandler(m_rpc); diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 6b4e0577..f0fe2002 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -75,6 +75,7 @@ void Front::fetchGatewayMetaInfo() { return; } + bcos::UpgradeGuard ul(l); front->m_agencyList = agencies; FRONT_LOG(INFO) << LOG_DESC("Update agencies information") << LOG_KV("agencies", printVector(agencies)); @@ -100,11 +101,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace bcos::bytes data; _message->encode(data); auto self = weak_from_this(); - // ROUTE_THROUGH_TOPIC will hold the topic - m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data), - _message->seq(), _timeout, _callback, - [self, _agencyID, _respCallback]( - Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) { + ppc::protocol::MessageCallback msgCallback = nullptr; + if (_respCallback) + { + msgCallback = [self, _agencyID, _respCallback]( + Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) { auto front = self.lock(); if (!front) { @@ -126,7 +127,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace // get the agencyID _respCallback(error, msg->header()->optionalField()->srcInst(), front->m_messageFactory->decodePPCMessage(msg), responseCallback); - }); + }; + } + // ROUTE_THROUGH_TOPIC will hold the topic + m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data), + _message->seq(), _timeout, _callback, msgCallback); } // send response when receiving message from given agencyID diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 18afb78d..fb8200d5 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -253,6 +253,10 @@ void GatewayImpl::asyncGetPeers(std::function cal Json::Value peers; peers["agency"] = m_agency; peers["nodeID"] = m_service->nodeID(); + // add the local gatewayInfo + Json::Value localGatewayInfo; + m_localRouter->routerInfo()->toJson(localGatewayInfo); + peers["gateway"] = localGatewayInfo; peers["peers"] = Json::Value(Json::arrayValue); for (auto const& it : infos) { @@ -292,5 +296,6 @@ void GatewayImpl::asyncGetAgencies( return; } auto agencies = m_peerRouter->agencies(); + agencies.emplace_back(m_agency); callback(nullptr, agencies); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index 2f3bb34a..bb289294 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -224,6 +224,7 @@ void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data) void GatewayNodeInfoImpl::toJson(Json::Value& jsonObject) const { + bcos::ReadGuard l(x_nodeList); jsonObject["gatewayNodeID"] = p2pNodeID(); jsonObject["agency"] = agency(); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h index 30b5f244..a3b33f2f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -66,6 +66,8 @@ class LocalRouter } uint32_t statusSeq() { return m_statusSeq; } + GatewayNodeInfo::Ptr const& routerInfo() const { return m_routerInfo; } + private: uint32_t increaseSeq() { diff --git a/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp b/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp index 849502c2..252e3ba4 100644 --- a/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp +++ b/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp @@ -50,7 +50,7 @@ int main(int argc, const char* argv[]) // not specify the certPath in air-mode ppcConfig->loadRpcConfig(param.configFilePath); auto rpcFactory = std::make_shared("selfParty"); - auto rpc = rpcFactory->buildRpc(ppcConfig); + auto rpc = rpcFactory->buildRpc(ppcConfig, nullptr); registerEchoHandler(rpc); // start the rpc rpc->start(); diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp index 5b21374e..f2a78a09 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp @@ -31,10 +31,11 @@ using namespace ppc::rpc; using namespace ppc::tools; using namespace ppc::protocol; -Rpc::Rpc(std::shared_ptr _wsService, std::string const& _selfPartyID, - std::string const& _token, std::string const& _prePath) +Rpc::Rpc(std::shared_ptr _wsService, ppc::gateway::IGateway::Ptr gateway, + std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath) : m_prePath(_prePath), m_wsService(std::move(_wsService)), + m_gateway(std::move(gateway)), m_taskFactory(std::make_shared(_selfPartyID, _prePath)), m_token(_token) { @@ -74,7 +75,8 @@ Rpc::Rpc(std::shared_ptr _wsService, std::string const& boost::bind(&Rpc::killBsModeTask, this, boost::placeholders::_1, boost::placeholders::_2); m_methodToHandler[UPDATE_BS_MODE_TASK_STATUS] = boost::bind( &Rpc::updateBsModeTaskStatus, this, boost::placeholders::_1, boost::placeholders::_2); - + m_methodToHandler[GET_PEERS] = + boost::bind(&Rpc::getPeers, this, boost::placeholders::_1, boost::placeholders::_2); RPC_LOG(INFO) << LOG_DESC("init rpc success") << LOG_KV("selfParty", _selfPartyID); } @@ -345,6 +347,32 @@ void Rpc::sendEcdhCipher(Json::Value const& _req, RespFunc _respFunc) _respFunc(result->error(), result->serializeToJson()); } +void Rpc::getPeers(Json::Value const& _req, RespFunc _respFunc) +{ + if (m_gateway == nullptr) + { + BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "the gateway not initialized!")); + } + m_gateway->asyncGetPeers([_respFunc](bcos::Error::Ptr error, std::string peersInfo) { + try + { + Json::Value root; + Json::Reader jsonReader; + + if (!jsonReader.parse(peersInfo, root)) + { + BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "Invalid json string: " + peersInfo)); + } + _respFunc(error, std::move(root)); + } + catch (std::exception const& e) + { + RPC_LOG(WARNING) << LOG_DESC("getPeers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + } + }); +} + void Rpc::sendPartnerCipher(Json::Value const& _req, RespFunc _respFunc) { if (!m_bsEcdhPSI) diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h index 45871f29..fe19b420 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h @@ -19,6 +19,7 @@ */ #pragma once #include "ppc-framework/front/FrontInterface.h" +#include "ppc-framework/gateway/IGateway.h" #include "ppc-framework/rpc/RpcInterface.h" #include "ppc-framework/rpc/RpcStatusInterface.h" #include "protocol/src/JsonTaskImpl.h" @@ -35,7 +36,8 @@ class Rpc : public RpcInterface { public: using Ptr = std::shared_ptr; - Rpc(std::shared_ptr _wsService, std::string const& _selfPartyID, + Rpc(std::shared_ptr _wsService, + ppc::gateway::IGateway::Ptr gateway, std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath = "data"); ~Rpc() override { stop(); } void start() override @@ -131,11 +133,14 @@ class Rpc : public RpcInterface virtual void killBsModeTask(Json::Value const& _req, RespFunc _respFunc); virtual void updateBsModeTaskStatus(Json::Value const& _req, RespFunc _respFunc); + virtual void getPeers(Json::Value const& _req, RespFunc _respFunc); + void checkHostResource(); private: std::string m_prePath; std::shared_ptr m_wsService; + ppc::gateway::IGateway::Ptr m_gateway; RpcStatusInterface::Ptr m_rpcStorage; // Note: here use jsonTaskFactory to decrease the overhead to convert json::value to string when diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp index a1db0739..92c1ed22 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp @@ -27,7 +27,8 @@ using namespace bcos; using namespace ppc::rpc; using namespace ppc::tools; -Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config) +Rpc::Ptr RpcFactory::buildRpc( + ppc::tools::PPCConfig::ConstPtr _config, ppc::gateway::IGateway::Ptr gateway) { auto wsConfig = initConfig(_config); // create the wsConfig @@ -39,7 +40,7 @@ Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config) initializer->initWsService(wsService); auto rpc = std::make_shared( - wsService, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation()); + wsService, gateway, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation()); rpc->setMinNeededMemory(_config->rpcConfig().minNeededMemoryGB); return rpc; } diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h index 3e6b512e..91f94ff1 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h @@ -19,6 +19,7 @@ */ #pragma once #include "Rpc.h" +#include "ppc-framework/gateway/IGateway.h" #include namespace bcos::boostssl::ws { @@ -37,8 +38,8 @@ class RpcFactory RpcFactory(std::string const& _selfPartyID) : m_selfPartyID(_selfPartyID) {} virtual ~RpcFactory() = default; - Rpc::Ptr buildRpc(std::shared_ptr _config); - + Rpc::Ptr buildRpc( + std::shared_ptr _config, ppc::gateway::IGateway::Ptr gateway); private: std::shared_ptr initConfig( diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.h b/cpp/wedpr-transport/sdk/ProTransportImpl.h index c89b33ef..ed29cbdd 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.h +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.h @@ -43,7 +43,6 @@ class ProTransportImpl : public Transport, public std::enable_shared_from_this

m_server; int m_keepAlivePeriodMs; std::shared_ptr m_timer; diff --git a/cpp/wedpr-transport/sdk/Transport.h b/cpp/wedpr-transport/sdk/Transport.h index cf9230fb..c9c36148 100644 --- a/cpp/wedpr-transport/sdk/Transport.h +++ b/cpp/wedpr-transport/sdk/Transport.h @@ -19,6 +19,7 @@ */ #pragma once #include "ppc-framework/front/IFront.h" +#include "ppc-framework/gateway/IGateway.h" namespace ppc::sdk { class Transport @@ -31,9 +32,12 @@ class Transport virtual void start() { m_front->start(); } virtual void stop() { m_front->stop(); } - virtual ppc::front::IFront::Ptr const& getFront() { return m_front; } + virtual ppc::front::IFront::Ptr const& getFront() const { return m_front; } + + virtual ppc::gateway::IGateway::Ptr const& gateway() const { return m_gateway; } protected: ppc::front::IFront::Ptr m_front; + ppc::gateway::IGateway::Ptr m_gateway; }; } // namespace ppc::sdk \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/TransportImpl.h b/cpp/wedpr-transport/sdk/TransportImpl.h index a400b416..fbd9e3e6 100644 --- a/cpp/wedpr-transport/sdk/TransportImpl.h +++ b/cpp/wedpr-transport/sdk/TransportImpl.h @@ -33,6 +33,7 @@ class TransportImpl : public Transport TransportImpl(ppc::front::FrontConfig::Ptr config, ppc::gateway::IGateway::Ptr const& gateway) : m_config(std::move(config)) { + m_gateway = gateway; ppc::front::FrontFactory frontFactory; m_front = frontFactory.build(std::make_shared(), std::make_shared(), From 5e67cffe342102bf29c25d0073232c581ec76d02 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 9 Sep 2024 17:49:04 +0800 Subject: [PATCH 3/4] fix gateway bugs --- cpp/ppc-framework/Common.h | 23 --------- cpp/ppc-framework/Helper.h | 48 +++++++++++++++++++ cpp/ppc-framework/protocol/INodeInfo.h | 2 +- cpp/ppc-framework/protocol/Message.h | 10 ++-- .../ppc-front/ppc-front/FrontFactory.cpp | 2 + .../ppc-gateway/gateway/GatewayImpl.cpp | 3 +- .../gateway/SendMessageWithRetry.cpp | 4 +- .../gateway/cache/MessageCache.cpp | 2 +- .../gateway/router/GatewayNodeInfo.h | 1 + .../gateway/router/GatewayRouterManager.cpp | 1 + .../gateway/router/LocalRouter.cpp | 2 +- .../gateway/router/PeerRouterTable.cpp | 1 + .../ppc-gateway/ppc-gateway/p2p/Service.cpp | 15 +++--- .../ppc-gateway/p2p/router/RouterManager.cpp | 1 + .../p2p/router/RouterTableImpl.cpp | 1 + 15 files changed, 75 insertions(+), 41 deletions(-) create mode 100644 cpp/ppc-framework/Helper.h diff --git a/cpp/ppc-framework/Common.h b/cpp/ppc-framework/Common.h index ec4bfb94..cfd3f7f3 100644 --- a/cpp/ppc-framework/Common.h +++ b/cpp/ppc-framework/Common.h @@ -61,29 +61,6 @@ DERIVE_PPC_EXCEPTION(DataSchemaNotSetted); DERIVE_PPC_EXCEPTION(UnsupportedDataSchema); DERIVE_PPC_EXCEPTION(WeDPRException); -constexpr static int MAX_PORT = 65535; -constexpr static int DEFAULT_SECURITY_PARAM = 128; - -constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18; -constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8; -constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26; - -inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept -{ - if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH) - { - return p2pId; - } - return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC); -} - - -template -inline std::string printNodeID(T const& nodeID) -{ - return std::string(nodeID.begin(), nodeID.begin() + 8); -} - #if ENABLE_CPU_FEATURES #if X86 static const cpu_features::X86Features CPU_FEATURES = cpu_features::GetX86Info().features; diff --git a/cpp/ppc-framework/Helper.h b/cpp/ppc-framework/Helper.h new file mode 100644 index 00000000..13f72c46 --- /dev/null +++ b/cpp/ppc-framework/Helper.h @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Common.h + * @author: yujiechen + * @date 2022-10-20 + */ +#pragma once +#include +#include +namespace ppc +{ +constexpr static int MAX_PORT = 65535; +constexpr static int DEFAULT_SECURITY_PARAM = 128; + +constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18; +constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8; +constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26; + +inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept +{ + if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH) + { + return p2pId; + } + return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC); +} + + +template +inline std::string_view printNodeID(T const& nodeID) +{ + size_t offset = nodeID.size() >= 8 ? 8 : nodeID.size(); + return std::string_view((const char*)nodeID.data(), offset); +} +} // namespace ppc \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index 1c843a27..2eb5e1d1 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -18,7 +18,7 @@ * @date 2024-08-26 */ #pragma once -#include "ppc-framework/Common.h" +#include "ppc-framework/Helper.h" #include #include #include diff --git a/cpp/ppc-framework/protocol/Message.h b/cpp/ppc-framework/protocol/Message.h index fb0f39e6..8fb013d3 100644 --- a/cpp/ppc-framework/protocol/Message.h +++ b/cpp/ppc-framework/protocol/Message.h @@ -18,9 +18,9 @@ * @date 2024-08-22 */ #pragma once -#include "../Common.h" #include "MessagePayload.h" #include "RouteType.h" +#include "ppc-framework/Helper.h" #include #include #include @@ -249,10 +249,10 @@ inline std::string printOptionalField(MessageOptionalHeader::Ptr optionalHeader) std::ostringstream stringstream; stringstream << LOG_KV("topic", optionalHeader->topic()) << LOG_KV("componentType", optionalHeader->componentType()) - << LOG_KV("srcNode", *(bcos::toHexString(optionalHeader->srcNode()))) - << LOG_KV("dstNode", *(bcos::toHexString(optionalHeader->dstNode()))) - << LOG_KV("dstInst", optionalHeader->dstInst()); - + << LOG_KV("srcNode", printNodeID(optionalHeader->srcNode())) + << LOG_KV("dstNode", printNodeID(optionalHeader->dstNode())) + << LOG_KV("srcInst", printNodeID(optionalHeader->srcInst())) + << LOG_KV("dstInst", printNodeID(optionalHeader->dstInst())); return stringstream.str(); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontFactory.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontFactory.cpp index 40b56ef1..eaac6f7a 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontFactory.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontFactory.cpp @@ -31,6 +31,8 @@ IFront::Ptr FrontFactory::build(INodeInfoFactory::Ptr nodeInfoFactory, auto nodeInfo = nodeInfoFactory->build( bcos::bytesConstRef((bcos::byte*)config->nodeID().data(), config->nodeID().size()), config->selfEndPoint().entryPoint()); + FRONT_LOG(INFO) << LOG_DESC("build front") << LOG_KV("nodeID", config->nodeID()) + << LOG_KV("endPoint", config->selfEndPoint().entryPoint()); return std::make_shared(threadPool, nodeInfo, messageFactory, routerInfoBuilder, gateway, std::make_shared()); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index fb8200d5..18163caa 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -127,8 +127,7 @@ void GatewayImpl::asyncSendMessage(ppc::protocol::RouteType routeType, auto p2pMessage = m_msgBuilder->build(routeType, routeInfo, std::move(payload)); p2pMessage->setSeq(traceID); p2pMessage->setPacketType((uint16_t)GatewayPacketType::P2PMessage); - GATEWAY_LOG(INFO) << LOG_DESC("##### asyncSendMessage") - << LOG_KV("msg", printMessage(p2pMessage)); + GATEWAY_LOG(TRACE) << LOG_DESC("asyncSendMessage") << LOG_KV("msg", printMessage(p2pMessage)); auto nodeList = m_localRouter->chooseReceiver(p2pMessage); // case send to the same agency if (!nodeList.empty()) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp index 19093b32..b83b07ff 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp @@ -34,9 +34,9 @@ GatewayNodeInfo::Ptr SendMessageWithRetry::chooseP2pNode() RecursiveGuard lock(x_mutex); if (!m_dstNodeList.empty()) { - auto selectedNode = m_dstNodeList.begin(); + auto selectedNode = *(m_dstNodeList.begin()); m_dstNodeList.erase(m_dstNodeList.begin()); - return *selectedNode; + return selectedNode; } return nullptr; } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp index 9e8b2cc6..de96b110 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp @@ -33,7 +33,7 @@ void MessageCache::insertCache( GATEWAY_LOG(DEBUG) << LOG_BADGE("MessageCache: insertCache") << LOG_KV("topic", topic); bcos::ReadGuard l(x_msgCache); auto it = m_msgCache.find(topic); - if (it == m_msgCache.end()) + if (it != m_msgCache.end()) { it->second->messages.emplace_back(MessageInfo{msg, callback}); return; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h index b7e3e0f2..4ed8d833 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -18,6 +18,7 @@ * @date 2024-08-26 */ #pragma once +#include "ppc-framework/Helper.h" #include "ppc-framework/protocol/INodeInfo.h" #include "ppc-utilities/Utilities.h" #include diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp index 3380eaef..53d6de9d 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp @@ -18,6 +18,7 @@ * @date 2024-08-26 */ #include "GatewayRouterManager.h" +#include "ppc-framework/Helper.h" #include "ppc-framework/gateway/GatewayProtocol.h" #include diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index baebc6eb..a43d82a2 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -18,7 +18,7 @@ * @date 2024-08-26 */ #include "LocalRouter.h" -#include "ppc-framework/Common.h" +#include "ppc-framework/Helper.h" #include "ppc-framework/gateway/GatewayProtocol.h" #include "ppc-gateway/Common.h" diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index 7d7889c3..abb55789 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -19,6 +19,7 @@ */ #include "PeerRouterTable.h" #include "ppc-framework/Common.h" +#include "ppc-framework/Helper.h" #include using namespace bcos; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index 68ecea83..1221f723 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -20,6 +20,7 @@ #include "Service.h" #include "bcos-boostssl/websocket/WsError.h" #include "ppc-framework/Common.h" +#include "ppc-framework/Helper.h" using namespace bcos; using namespace ppc; @@ -141,7 +142,7 @@ void Service::removeSessionInfo(WsSession::Ptr const& _session) { RecursiveGuard l(x_nodeID2Session); auto it = m_nodeID2Session.find(_session->nodeId()); - if (it != m_nodeID2Session.end()) + if (it != m_nodeID2Session.end() && it->second->endPoint() == _session->endPoint()) { SERVICE_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session" << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) @@ -150,6 +151,7 @@ void Service::removeSessionInfo(WsSession::Ptr const& _session) m_nodeID2Session.erase(it); } } + void Service::onP2PDisconnect(WsSession::Ptr _session) { // remove the session information @@ -257,11 +259,10 @@ void Service::asyncSendMessage( sessions.emplace_back(session); return WsService::asyncSendMessage(sessions, msg, options, respFunc); } - if (respFunc) { Error::Ptr error = std::make_shared( - -1, "send message to " + dstNodeID + + -1, "send message to " + std::string(printP2PIDElegantly(dstNodeID)) + " failed for no network established, msg: " + printWsMessage(msg)); respFunc(std::move(error), nullptr, nullptr); } @@ -271,12 +272,14 @@ void Service::asyncSendMessage( } catch (std::exception const& e) { - SERVICE_LOG(ERROR) << "asyncSendMessageByNodeID" << LOG_KV("dstNode", dstNodeID) + SERVICE_LOG(ERROR) << "asyncSendMessageByNodeID" + << LOG_KV("dstNode", printP2PIDElegantly(dstNodeID)) << LOG_KV("what", boost::diagnostic_information(e)); if (respFunc) { - respFunc(std::make_shared(-1, "send message to " + dstNodeID + " failed for " + - boost::diagnostic_information(e)), + respFunc(std::make_shared( + -1, "send message to " + std::string(printP2PIDElegantly(dstNodeID)) + + " failed for " + boost::diagnostic_information(e)), nullptr, nullptr); } } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp index 77a765fb..ce25385c 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp @@ -18,6 +18,7 @@ * @date 2024-08-26 */ #include "RouterManager.h" +#include "ppc-framework/Helper.h" #include "ppc-framework/gateway/GatewayProtocol.h" #include "ppc-framework/protocol/Message.h" #include diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp index f3cc3071..729af315 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp @@ -18,6 +18,7 @@ * @date 2022-5-24 */ #include "RouterTableImpl.h" +#include "ppc-framework/Helper.h" #include "ppc-gateway/Common.h" using namespace bcos; From e2667ac70f3a5e694427843010a0c6c2bf1997e5 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 9 Sep 2024 20:13:14 +0800 Subject: [PATCH 4/4] add registerMessageHandler to support dispatcher message by component --- cpp/ppc-framework/front/IFront.h | 2 + cpp/ppc-framework/protocol/Protocol.h | 1 - cpp/wedpr-initializer/Initializer.cpp | 4 +- .../ppc-front/ppc-front/CallbackManager.cpp | 52 ++++++++--- .../ppc-front/ppc-front/CallbackManager.h | 11 +++ .../ppc-front/ppc-front/Front.cpp | 5 +- .../ppc-front/ppc-front/Front.h | 4 +- .../ppc-front/ppc-front/FrontImpl.h | 6 ++ .../gateway/router/GatewayNodeInfoImpl.cpp | 11 +++ .../gateway/router/GatewayRouterManager.cpp | 1 - .../gateway/router/LocalRouter.cpp | 2 +- .../ppc-gateway/ppc-gateway/p2p/Service.cpp | 93 ++++++++++--------- .../ppc-gateway/ppc-gateway/p2p/Service.h | 4 +- 13 files changed, 129 insertions(+), 67 deletions(-) diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index f3a9b647..324d8124 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -71,6 +71,8 @@ class IFront : virtual public IFrontClient virtual void registerTopicHandler( std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0; + virtual void registerMessageHandler( + std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0; /** * @brief async send message * diff --git a/cpp/ppc-framework/protocol/Protocol.h b/cpp/ppc-framework/protocol/Protocol.h index 889b7104..c9c6c850 100644 --- a/cpp/ppc-framework/protocol/Protocol.h +++ b/cpp/ppc-framework/protocol/Protocol.h @@ -396,7 +396,6 @@ inline std::ostream& operator<<(std::ostream& _out, HashImplName const& _type) enum class MessageType : uint16_t { - GatewayMessage = 0x0000, RpcRequest = 0x1000, // the rpc request type }; diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index e25a63b4..4e7c4eec 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -74,7 +74,6 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway) m_protocolInitializer = std::make_shared(); m_protocolInitializer->init(m_config); - auto ppcMessageFactory = std::make_shared(); // init the frontService INIT_LOG(INFO) << LOG_DESC("init the frontService") << LOG_KV("agency", m_config->agencyID()); auto frontThreadPool = std::make_shared("front", m_config->threadPoolSize()); @@ -89,7 +88,8 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway) { m_transport = transportBuilder.build(SDKMode::PRO, m_config->frontConfig(), nullptr); } - m_ppcFront = std::make_shared(m_transport->getFront()); + m_ppcFront = + std::make_shared(std::make_shared(), m_transport->getFront()); INIT_LOG(INFO) << LOG_DESC("init the frontService success") << LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig())) diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp index 51031f5b..3f500292 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp @@ -128,21 +128,49 @@ void CallbackManager::registerTopicHandler( m_topicHandlers.insert(std::make_pair(topic, callback)); } +void CallbackManager::registerMessageHandler( + std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) +{ + bcos::WriteGuard l(x_msgHandlers); + m_msgHandlers.insert(std::make_pair(componentType, callback)); +} + +MessageDispatcherCallback CallbackManager::getHandlerByTopic(std::string const& topic) +{ + bcos::ReadGuard l(x_topicHandlers); + auto it = m_topicHandlers.find(topic); + if (it != m_topicHandlers.end()) + { + return it->second; + } + return nullptr; +} + +MessageDispatcherCallback CallbackManager::getHandlerByComponentType( + std::string const& componentType) +{ + bcos::ReadGuard l(x_msgHandlers); + auto it = m_msgHandlers.find(componentType); + if (it != m_msgHandlers.end()) + { + return it->second; + } + return nullptr; +} + void CallbackManager::onReceiveMessage(std::string const& topic, Message::Ptr msg) { - MessageDispatcherCallback callback = nullptr; + auto callback = getHandlerByTopic(topic); + if (!callback) { - bcos::ReadGuard l(x_topicHandlers); - auto it = m_topicHandlers.find(topic); - if (it == m_topicHandlers.end()) - { - FRONT_LOG(DEBUG) << LOG_DESC( - "onReceiveMessage: not find the handler, put into the buffer") - << LOG_KV("topic", topic); - addMsgCache(topic, msg); - return; - } - callback = it->second; + callback = getHandlerByComponentType(msg->header()->optionalField()->componentType()); + } + if (!callback) + { + FRONT_LOG(DEBUG) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer") + << LOG_KV("topic", topic); + addMsgCache(topic, msg); + return; } m_threadPool->enqueue([callback, msg]() { try diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.h b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.h index 95894d52..819faa05 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.h @@ -64,6 +64,9 @@ class CallbackManager : public std::enable_shared_from_this virtual void registerTopicHandler( std::string const& topic, ppc::protocol::MessageDispatcherCallback callback); + virtual void registerMessageHandler( + std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback); + virtual ppc::protocol::Message::Ptr pop(std::string const& topic, int timeoutMs) { auto it = m_msgCache.find(topic); @@ -94,6 +97,10 @@ class CallbackManager : public std::enable_shared_from_this msgQueue->push(std::move(msg)); } + ppc::protocol::MessageDispatcherCallback getHandlerByTopic(std::string const& topic); + ppc::protocol::MessageDispatcherCallback getHandlerByComponentType( + std::string const& componentType); + private: bcos::ThreadPool::Ptr m_threadPool; std::shared_ptr m_ioService; @@ -105,6 +112,10 @@ class CallbackManager : public std::enable_shared_from_this std::map m_topicHandlers; mutable bcos::SharedMutex x_topicHandlers; + // componentType => messageDispatcherCallback + std::map m_msgHandlers; + mutable bcos::SharedMutex x_msgHandlers; + // the messageCache for the message with no topic handler defined uint64_t m_maxMsgCacheSize = 10000; // TODO: check the queueSize diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index f0fe2002..a3259cd6 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -26,7 +26,8 @@ using namespace bcos; using namespace ppc::protocol; using namespace ppc::front; -Front::Front(IFront::Ptr front) : m_front(std::move(front)) +Front::Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front) + : m_messageFactory(std::move(ppcMsgFactory)), m_front(std::move(front)) { m_fetcher = std::make_shared(60 * 1000, "metaFetcher"); m_fetcher->registerTimeoutHandler([this]() { @@ -98,6 +99,8 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace auto routeInfo = front->routerInfoBuilder()->build(); routeInfo->setDstInst(_agencyID); routeInfo->setTopic(_message->taskID()); + auto type = ((uint16_t)_message->taskType() << 8) | _message->algorithmType(); + routeInfo->setComponentType(std::to_string(type)); bcos::bytes data; _message->encode(data); auto self = weak_from_this(); diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h index cdd5f8ab..4efa9b5d 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h @@ -30,7 +30,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - Front(IFront::Ptr front); + Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front); ~Front() override {} void start() override; @@ -66,7 +66,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this { uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType; auto self = weak_from_this(); - m_front->registerTopicHandler( + m_front->registerMessageHandler( std::to_string(type), [self, _handler](ppc::protocol::Message::Ptr msg) { auto front = self.lock(); if (!front) diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index 552c5ee6..b1c36b66 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -104,6 +104,12 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ m_callbackManager->registerTopicHandler(topic, callback); } + void registerMessageHandler(std::string const& componentType, + ppc::protocol::MessageDispatcherCallback callback) override + { + m_callbackManager->registerMessageHandler(componentType, callback); + } + /** * @brief register the nodeInfo to the gateway * @param nodeInfo the nodeInfo diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index bb289294..fb14954b 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -155,6 +155,17 @@ std::vector> GatewayNodeInfoImpl::choo bool selectAll, std::string const& topic) const { std::vector> result; + // empty topic means broadcast message to all front + if (topic.empty()) + { + bcos::ReadGuard l(x_nodeList); + for (auto const& it : m_nodeList) + { + result.emplace_back(it.second->getFront()); + } + return result; + } + // the topic specified bcos::ReadGuard l(x_topicInfo); for (auto const& it : m_topicInfo) { diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp index 53d6de9d..7534e3b0 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp @@ -83,7 +83,6 @@ void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSessi { auto statusSeq = boost::asio::detail::socket_ops::network_to_host_long(*((uint32_t*)msg->payload()->data())); - auto p2pMessage = std::dynamic_pointer_cast(msg); auto const& from = (p2pMessage->header()->srcGwNode().size() > 0) ? p2pMessage->header()->srcGwNode() : diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index a43d82a2..2b99dec6 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -77,7 +77,7 @@ void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc callback, bool holding) { auto frontList = chooseReceiver(msg); - // send success + // find the front if (!frontList.empty()) { for (auto const& front : frontList) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index 1221f723..3e96a31f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -57,14 +57,13 @@ void Service::onP2PConnect(WsSession::Ptr _session) RecursiveGuard l(x_nodeID2Session); auto it = m_nodeID2Session.find(_session->nodeId()); - // the session already connected if (it != m_nodeID2Session.end() && it->second->isConnected()) { SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect, drop the duplicated connection") << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); - _session->drop(WsError::UserDisconnect); updateNodeIDInfo(_session); + _session->drop(WsError::UserDisconnect); return; } // the node-self @@ -77,41 +76,18 @@ void Service::onP2PConnect(WsSession::Ptr _session) _session->drop(WsError::UserDisconnect); return; } - - ///// Note: here allow all new session, even the ip not configured(support dynamic access) bool updated = updateNodeIDInfo(_session); // hit the m_nodeID2Session if (it != m_nodeID2Session.end()) { - // the old session has already been connected, and the new session endPoint is not - // configured - if (it->second->isConnected() && !updated) - { - SERVICE_LOG(INFO) << LOG_DESC( - "onP2PConnect, drop the new not-configurated session, remain " - "the old session") - << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) - << LOG_KV("endPoint", _session->endPoint()) - << LOG_KV("oldEndPoint", it->second->endPoint()); - _session->drop(WsError::UserDisconnect); - return; - } - SERVICE_LOG(INFO) << LOG_DESC( - "onP2PConnect, drop the old not-configurated session, replace " - "with the new session") - << LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId())) - << LOG_KV("endPoint", _session->endPoint()) - << LOG_KV("oldEndPoint", it->second->endPoint()); - if (it->second->isConnected()) - { - it->second->drop(WsError::UserDisconnect); - } it->second = _session; - return; } - // the new session - m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); + else + { + // the new session + m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); + } SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect established new session") << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); @@ -127,7 +103,7 @@ bool Service::updateNodeIDInfo(WsSession::Ptr const& _session) { it->second = p2pNodeID; SERVICE_LOG(INFO) << LOG_DESC("updateNodeIDInfo: update the nodeID") - << LOG_KV("nodeid", p2pNodeID) + << LOG_KV("nodeid", printP2PIDElegantly(p2pNodeID)) << LOG_KV("endpoint", _session->endPoint()); return true; } @@ -138,38 +114,58 @@ bool Service::updateNodeIDInfo(WsSession::Ptr const& _session) return false; } -void Service::removeSessionInfo(WsSession::Ptr const& _session) +bool Service::removeSessionInfo(WsSession::Ptr const& _session) { RecursiveGuard l(x_nodeID2Session); auto it = m_nodeID2Session.find(_session->nodeId()); - if (it != m_nodeID2Session.end() && it->second->endPoint() == _session->endPoint()) + if (it != m_nodeID2Session.end() && it->second->endPointInfo() == _session->endPointInfo()) { - SERVICE_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session" + SERVICE_LOG(INFO) << "onP2PDisconnect: remove from m_nodeID2Session" << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) << LOG_KV("endpoint", _session->endPoint()); m_nodeID2Session.erase(it); + return true; } + return false; } void Service::onP2PDisconnect(WsSession::Ptr _session) { // remove the session information - removeSessionInfo(_session); + if (!removeSessionInfo(_session)) + { + return; + } // update the session nodeID to empty UpgradableGuard l(x_configuredNode2ID); for (auto& it : m_configuredNode2ID) { - // reset the nodeID of the dropped session(except the node-self) to empty - if (m_nodeID != _session->nodeId() && it.second == _session->nodeId()) + // the node-self, no need to reset the nodeID + if (m_nodeID == _session->nodeId()) { - UpgradeGuard ul(l); - it.second.clear(); - break; + continue; } + // not with the same nodeID, can't reset the nodeID + if (it.second != _session->nodeId()) + { + continue; + } + UpgradeGuard ul(l); + it.second.clear(); + SERVICE_LOG(INFO) << "onP2PDisconnect: clear the nodeID information" + << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) + << LOG_KV("endpoint", _session->endPoint()); + break; } } +bool Service::nodeConnected(std::string const& nodeID) +{ + bcos::RecursiveGuard l(x_nodeID2Session); + return m_nodeID2Session.count(nodeID); +} + void Service::reconnect() { // obtain the un-connected peers information @@ -182,14 +178,12 @@ void Service::reconnect() { continue; } - if (!it.second.empty() && isConnected(it.first)) + if (!it.second.empty() && nodeConnected(it.second)) { continue; } unconnectedPeers->insert(it.first); - SERVICE_LOG(DEBUG) << LOG_DESC("ready to reconnect") - << LOG_KV("endpoint", - it.first.address() + ":" + std::to_string(it.first.port())); + SERVICE_LOG(DEBUG) << LOG_DESC("ready to reconnect") << LOG_KV("endpoint", it.first); } } setReconnectedPeers(unconnectedPeers); @@ -233,7 +227,7 @@ void Service::asyncSendMessageWithForward( return asyncSendMessage(dstNodeID, msg, options, respFunc); } // with nextHop, send the message to nextHop - SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessageByNodeID") << printMessage(p2pMsg); + SERVICE_LOG(TRACE) << LOG_DESC("asyncSendMessageWithForward") << printMessage(p2pMsg); return asyncSendMessage(nextHop, msg, options, respFunc); } @@ -313,8 +307,14 @@ void Service::asyncBroadcastMessage(bcos::boostssl::MessageFace::Ptr msg, Option auto reachableNodes = m_routerTable->getAllReachableNode(); try { + if (msg->seq().empty()) + { + msg->setSeq(m_messageFactory->newSeq()); + } for (auto const& node : reachableNodes) { + auto p2pMsg = std::dynamic_pointer_cast(msg); + p2pMsg->header()->setDstGwNode(node); asyncSendMessageByNodeID(node, msg, options); } } @@ -356,5 +356,6 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& sessions.emplace_back(session); WsService::asyncSendMessage(sessions, respMessage); SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage)) - << LOG_KV("payload size", payload->size()); + << LOG_KV("payloadSize", + respMessage->payload() ? respMessage->payload()->size() : 0); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h index c41e0c1f..377d1161 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h @@ -77,10 +77,12 @@ class Service : public bcos::boostssl::ws::WsService virtual void onP2PConnect(bcos::boostssl::ws::WsSession::Ptr _session); virtual void onP2PDisconnect(bcos::boostssl::ws::WsSession::Ptr _session); + virtual bool nodeConnected(std::string const& nodeID); + void reconnect() override; bool updateNodeIDInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); - void removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); + bool removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); bcos::boostssl::ws::WsSession::Ptr getSessionByNodeID(std::string const& _nodeID); virtual void asyncSendMessageWithForward(std::string const& dstNodeID,