From b1660b7ecf5293c9fd959cfda542a65fc809f240 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 9 Sep 2024 15:42:03 +0800 Subject: [PATCH] add getPeers rpc implementation --- cpp/cmake/BuildInfo.cmake | 2 +- cpp/ppc-framework/rpc/RpcTypeDef.h | 1 + .../air-node/AirNodeInitializer.cpp | 2 +- cpp/wedpr-main/cem-node/CEMInitializer.cpp | 2 +- cpp/wedpr-main/common/NodeStarter.h | 4 +-- cpp/wedpr-main/mpc-node/MPCInitializer.cpp | 2 +- .../pro-node/ProNodeInitializer.cpp | 3 +- .../ppc-front/ppc-front/Front.cpp | 17 ++++++---- .../ppc-gateway/gateway/GatewayImpl.cpp | 5 +++ .../gateway/router/GatewayNodeInfoImpl.cpp | 1 + .../ppc-gateway/gateway/router/LocalRouter.h | 2 ++ cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp | 2 +- cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp | 34 +++++++++++++++++-- cpp/wedpr-transport/ppc-rpc/src/Rpc.h | 7 +++- .../ppc-rpc/src/RpcFactory.cpp | 5 +-- cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h | 5 +-- cpp/wedpr-transport/sdk/ProTransportImpl.h | 1 - cpp/wedpr-transport/sdk/Transport.h | 6 +++- cpp/wedpr-transport/sdk/TransportImpl.h | 1 + 19 files changed, 78 insertions(+), 24 deletions(-) diff --git a/cpp/cmake/BuildInfo.cmake b/cpp/cmake/BuildInfo.cmake index 628b7c23..7ef9778c 100644 --- a/cpp/cmake/BuildInfo.cmake +++ b/cpp/cmake/BuildInfo.cmake @@ -27,7 +27,7 @@ function(create_build_info) # Generate header file containing useful build information add_custom_target(BuildInfo.h ALL WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} - COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}" + COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}/.." -DPPC_BUILDINFO_IN="${CMAKE_CURRENT_SOURCE_DIR}/cmake/templates/BuildInfo.h.in" -DPPC_DST_DIR="${PROJECT_BINARY_DIR}/include" -DPPC_CMAKE_DIR="${CMAKE_CURRENT_SOURCE_DIR}/cmake" diff --git a/cpp/ppc-framework/rpc/RpcTypeDef.h b/cpp/ppc-framework/rpc/RpcTypeDef.h index 5a0fb493..208fee40 100644 --- a/cpp/ppc-framework/rpc/RpcTypeDef.h +++ b/cpp/ppc-framework/rpc/RpcTypeDef.h @@ -36,6 +36,7 @@ enum class RpcError : int32_t std::string const RUN_TASK_METHOD = "runTask"; std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask"; std::string const GET_TASK_STATUS = "getTaskStatus"; +std::string const GET_PEERS = "getPeers"; std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask"; std::string const FETCH_CIPHER = "fetchCipher"; diff --git a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp index ffcf605a..af88181d 100644 --- a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp +++ b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath) auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); - m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); + m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway); m_rpc->setRpcStorage(rpcStatusInterface); m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); m_nodeInitializer->registerRpcHandler(m_rpc); diff --git a/cpp/wedpr-main/cem-node/CEMInitializer.cpp b/cpp/wedpr-main/cem-node/CEMInitializer.cpp index 69c2efbe..e228fa5a 100644 --- a/cpp/wedpr-main/cem-node/CEMInitializer.cpp +++ b/cpp/wedpr-main/cem-node/CEMInitializer.cpp @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath) auto storageConfig = ppcConfig->storageConfig(); auto cemConfig = ppcConfig->cemConfig(); auto rpcFactory = std::make_shared(ppcConfig->agencyID()); - m_rpc = rpcFactory->buildRpc(ppcConfig); + m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr); auto cemService = std::make_shared(); cemService->setCEMConfig(cemConfig); cemService->setStorageConfig(storageConfig); diff --git a/cpp/wedpr-main/common/NodeStarter.h b/cpp/wedpr-main/common/NodeStarter.h index 95d8d671..e33ad7ef 100644 --- a/cpp/wedpr-main/common/NodeStarter.h +++ b/cpp/wedpr-main/common/NodeStarter.h @@ -68,13 +68,13 @@ int startProgram( } printVersion(); std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "The " + binaryName + "is running..." << std::endl; + std::cout << "The " + binaryName + " is running..." << std::endl; while (!exitHandler.shouldExit()) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } starter.reset(); std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "The" + binaryName + " program exit normally." << std::endl; + std::cout << "The " + binaryName + " program exit normally." << std::endl; } } // namespace ppc::node \ No newline at end of file diff --git a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp index de8b8f1f..6998876a 100644 --- a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp +++ b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath) auto storageConfig = ppcConfig->storageConfig(); auto mpcConfig = ppcConfig->mpcConfig(); auto rpcFactory = std::make_shared(ppcConfig->agencyID()); - m_rpc = rpcFactory->buildRpc(ppcConfig); + m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr); auto mpcService = std::make_shared(); mpcService->setMPCConfig(mpcConfig); mpcService->setStorageConfig(storageConfig); diff --git a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp index 2ced6616..ec0ce5cc 100644 --- a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp +++ b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath) auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); - m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); + m_rpc = rpcFactory->buildRpc( + m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway()); m_rpc->setRpcStorage(rpcStatusInterface); m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); m_nodeInitializer->registerRpcHandler(m_rpc); diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 6b4e0577..f0fe2002 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -75,6 +75,7 @@ void Front::fetchGatewayMetaInfo() { return; } + bcos::UpgradeGuard ul(l); front->m_agencyList = agencies; FRONT_LOG(INFO) << LOG_DESC("Update agencies information") << LOG_KV("agencies", printVector(agencies)); @@ -100,11 +101,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace bcos::bytes data; _message->encode(data); auto self = weak_from_this(); - // ROUTE_THROUGH_TOPIC will hold the topic - m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data), - _message->seq(), _timeout, _callback, - [self, _agencyID, _respCallback]( - Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) { + ppc::protocol::MessageCallback msgCallback = nullptr; + if (_respCallback) + { + msgCallback = [self, _agencyID, _respCallback]( + Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) { auto front = self.lock(); if (!front) { @@ -126,7 +127,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace // get the agencyID _respCallback(error, msg->header()->optionalField()->srcInst(), front->m_messageFactory->decodePPCMessage(msg), responseCallback); - }); + }; + } + // ROUTE_THROUGH_TOPIC will hold the topic + m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data), + _message->seq(), _timeout, _callback, msgCallback); } // send response when receiving message from given agencyID diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 18afb78d..fb8200d5 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -253,6 +253,10 @@ void GatewayImpl::asyncGetPeers(std::function cal Json::Value peers; peers["agency"] = m_agency; peers["nodeID"] = m_service->nodeID(); + // add the local gatewayInfo + Json::Value localGatewayInfo; + m_localRouter->routerInfo()->toJson(localGatewayInfo); + peers["gateway"] = localGatewayInfo; peers["peers"] = Json::Value(Json::arrayValue); for (auto const& it : infos) { @@ -292,5 +296,6 @@ void GatewayImpl::asyncGetAgencies( return; } auto agencies = m_peerRouter->agencies(); + agencies.emplace_back(m_agency); callback(nullptr, agencies); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index 2f3bb34a..bb289294 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -224,6 +224,7 @@ void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data) void GatewayNodeInfoImpl::toJson(Json::Value& jsonObject) const { + bcos::ReadGuard l(x_nodeList); jsonObject["gatewayNodeID"] = p2pNodeID(); jsonObject["agency"] = agency(); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h index 30b5f244..a3b33f2f 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -66,6 +66,8 @@ class LocalRouter } uint32_t statusSeq() { return m_statusSeq; } + GatewayNodeInfo::Ptr const& routerInfo() const { return m_routerInfo; } + private: uint32_t increaseSeq() { diff --git a/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp b/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp index 849502c2..252e3ba4 100644 --- a/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp +++ b/cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp @@ -50,7 +50,7 @@ int main(int argc, const char* argv[]) // not specify the certPath in air-mode ppcConfig->loadRpcConfig(param.configFilePath); auto rpcFactory = std::make_shared("selfParty"); - auto rpc = rpcFactory->buildRpc(ppcConfig); + auto rpc = rpcFactory->buildRpc(ppcConfig, nullptr); registerEchoHandler(rpc); // start the rpc rpc->start(); diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp index 5b21374e..f2a78a09 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp @@ -31,10 +31,11 @@ using namespace ppc::rpc; using namespace ppc::tools; using namespace ppc::protocol; -Rpc::Rpc(std::shared_ptr _wsService, std::string const& _selfPartyID, - std::string const& _token, std::string const& _prePath) +Rpc::Rpc(std::shared_ptr _wsService, ppc::gateway::IGateway::Ptr gateway, + std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath) : m_prePath(_prePath), m_wsService(std::move(_wsService)), + m_gateway(std::move(gateway)), m_taskFactory(std::make_shared(_selfPartyID, _prePath)), m_token(_token) { @@ -74,7 +75,8 @@ Rpc::Rpc(std::shared_ptr _wsService, std::string const& boost::bind(&Rpc::killBsModeTask, this, boost::placeholders::_1, boost::placeholders::_2); m_methodToHandler[UPDATE_BS_MODE_TASK_STATUS] = boost::bind( &Rpc::updateBsModeTaskStatus, this, boost::placeholders::_1, boost::placeholders::_2); - + m_methodToHandler[GET_PEERS] = + boost::bind(&Rpc::getPeers, this, boost::placeholders::_1, boost::placeholders::_2); RPC_LOG(INFO) << LOG_DESC("init rpc success") << LOG_KV("selfParty", _selfPartyID); } @@ -345,6 +347,32 @@ void Rpc::sendEcdhCipher(Json::Value const& _req, RespFunc _respFunc) _respFunc(result->error(), result->serializeToJson()); } +void Rpc::getPeers(Json::Value const& _req, RespFunc _respFunc) +{ + if (m_gateway == nullptr) + { + BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "the gateway not initialized!")); + } + m_gateway->asyncGetPeers([_respFunc](bcos::Error::Ptr error, std::string peersInfo) { + try + { + Json::Value root; + Json::Reader jsonReader; + + if (!jsonReader.parse(peersInfo, root)) + { + BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "Invalid json string: " + peersInfo)); + } + _respFunc(error, std::move(root)); + } + catch (std::exception const& e) + { + RPC_LOG(WARNING) << LOG_DESC("getPeers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + } + }); +} + void Rpc::sendPartnerCipher(Json::Value const& _req, RespFunc _respFunc) { if (!m_bsEcdhPSI) diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h index 45871f29..fe19b420 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h @@ -19,6 +19,7 @@ */ #pragma once #include "ppc-framework/front/FrontInterface.h" +#include "ppc-framework/gateway/IGateway.h" #include "ppc-framework/rpc/RpcInterface.h" #include "ppc-framework/rpc/RpcStatusInterface.h" #include "protocol/src/JsonTaskImpl.h" @@ -35,7 +36,8 @@ class Rpc : public RpcInterface { public: using Ptr = std::shared_ptr; - Rpc(std::shared_ptr _wsService, std::string const& _selfPartyID, + Rpc(std::shared_ptr _wsService, + ppc::gateway::IGateway::Ptr gateway, std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath = "data"); ~Rpc() override { stop(); } void start() override @@ -131,11 +133,14 @@ class Rpc : public RpcInterface virtual void killBsModeTask(Json::Value const& _req, RespFunc _respFunc); virtual void updateBsModeTaskStatus(Json::Value const& _req, RespFunc _respFunc); + virtual void getPeers(Json::Value const& _req, RespFunc _respFunc); + void checkHostResource(); private: std::string m_prePath; std::shared_ptr m_wsService; + ppc::gateway::IGateway::Ptr m_gateway; RpcStatusInterface::Ptr m_rpcStorage; // Note: here use jsonTaskFactory to decrease the overhead to convert json::value to string when diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp index a1db0739..92c1ed22 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp @@ -27,7 +27,8 @@ using namespace bcos; using namespace ppc::rpc; using namespace ppc::tools; -Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config) +Rpc::Ptr RpcFactory::buildRpc( + ppc::tools::PPCConfig::ConstPtr _config, ppc::gateway::IGateway::Ptr gateway) { auto wsConfig = initConfig(_config); // create the wsConfig @@ -39,7 +40,7 @@ Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config) initializer->initWsService(wsService); auto rpc = std::make_shared( - wsService, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation()); + wsService, gateway, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation()); rpc->setMinNeededMemory(_config->rpcConfig().minNeededMemoryGB); return rpc; } diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h index 3e6b512e..91f94ff1 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h @@ -19,6 +19,7 @@ */ #pragma once #include "Rpc.h" +#include "ppc-framework/gateway/IGateway.h" #include namespace bcos::boostssl::ws { @@ -37,8 +38,8 @@ class RpcFactory RpcFactory(std::string const& _selfPartyID) : m_selfPartyID(_selfPartyID) {} virtual ~RpcFactory() = default; - Rpc::Ptr buildRpc(std::shared_ptr _config); - + Rpc::Ptr buildRpc( + std::shared_ptr _config, ppc::gateway::IGateway::Ptr gateway); private: std::shared_ptr initConfig( diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.h b/cpp/wedpr-transport/sdk/ProTransportImpl.h index c89b33ef..ed29cbdd 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.h +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.h @@ -43,7 +43,6 @@ class ProTransportImpl : public Transport, public std::enable_shared_from_this

m_server; int m_keepAlivePeriodMs; std::shared_ptr m_timer; diff --git a/cpp/wedpr-transport/sdk/Transport.h b/cpp/wedpr-transport/sdk/Transport.h index cf9230fb..c9c36148 100644 --- a/cpp/wedpr-transport/sdk/Transport.h +++ b/cpp/wedpr-transport/sdk/Transport.h @@ -19,6 +19,7 @@ */ #pragma once #include "ppc-framework/front/IFront.h" +#include "ppc-framework/gateway/IGateway.h" namespace ppc::sdk { class Transport @@ -31,9 +32,12 @@ class Transport virtual void start() { m_front->start(); } virtual void stop() { m_front->stop(); } - virtual ppc::front::IFront::Ptr const& getFront() { return m_front; } + virtual ppc::front::IFront::Ptr const& getFront() const { return m_front; } + + virtual ppc::gateway::IGateway::Ptr const& gateway() const { return m_gateway; } protected: ppc::front::IFront::Ptr m_front; + ppc::gateway::IGateway::Ptr m_gateway; }; } // namespace ppc::sdk \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/TransportImpl.h b/cpp/wedpr-transport/sdk/TransportImpl.h index a400b416..fbd9e3e6 100644 --- a/cpp/wedpr-transport/sdk/TransportImpl.h +++ b/cpp/wedpr-transport/sdk/TransportImpl.h @@ -33,6 +33,7 @@ class TransportImpl : public Transport TransportImpl(ppc::front::FrontConfig::Ptr config, ppc::gateway::IGateway::Ptr const& gateway) : m_config(std::move(config)) { + m_gateway = gateway; ppc::front::FrontFactory frontFactory; m_front = frontFactory.build(std::make_shared(), std::make_shared(),