diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c02bf0aa..dffbaea8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -88,7 +88,8 @@ set(ALL_SOURCE_LIST wedpr-storage/ppc-io wedpr-storage/ppc-storage wedpr-transport/ppc-gateway wedpr-transport/ppc-front wedpr-transport/ppc-http wedpr-transport/ppc-rpc wedpr-transport/sdk - wedpr-computing/ppc-psi wedpr-computing/ppc-mpc wedpr-computing/ppc-pir ${CEM_SOURCE}) + wedpr-computing/ppc-psi wedpr-computing/ppc-mpc wedpr-computing/ppc-pir ${CEM_SOURCE} + wedpr-initializer wedpr-main) if(BUILD_WEDPR_TOOLKIT) # fetch the python dependencies diff --git a/cpp/cmake/TargetSettings.cmake b/cpp/cmake/TargetSettings.cmake index b72cec18..1b30601a 100644 --- a/cpp/cmake/TargetSettings.cmake +++ b/cpp/cmake/TargetSettings.cmake @@ -63,8 +63,8 @@ set(RPC_TARGET "ppc-rpc") # libhelper set(HELPER_TARGET "ppc-helper") -# libinitializer -set(INIT_LIB init) +# wedpr-initializer +set(INIT_LIB "wedpr-inititializer") # ppc-cem set(CEM_TARGET "ppc-cem") diff --git a/cpp/ppc-framework/front/FrontConfig.h b/cpp/ppc-framework/front/FrontConfig.h index 6cee462a..affe5d9b 100644 --- a/cpp/ppc-framework/front/FrontConfig.h +++ b/cpp/ppc-framework/front/FrontConfig.h @@ -20,7 +20,10 @@ #pragma once #include "ppc-framework/protocol/EndPoint.h" +#include "ppc-framework/protocol/GrpcConfig.h" +#include "ppc-framework/protocol/INodeInfo.h" #include +#include #include #include @@ -49,40 +52,58 @@ class FrontConfig virtual void appendGatewayInfo(ppc::protocol::EndPoint&& endpoint) { - // TODO:check the endpoint m_gatewayInfo.push_back(endpoint); } ppc::protocol::EndPoint const& selfEndPoint() const { return m_selfEndPoint; } void setSelfEndPoint(ppc::protocol::EndPoint const& endPoint) { m_selfEndPoint = endPoint; } - // TODO here - std::string gatewayEndPoints() { return ""; } + // refer to: https://github.com/grpc/grpc-node/issues/2066 + // grpc prefer to using ipv4:${host1}:${port1},${host2}:${port2} as target to support multiple + // servers + std::string gatewayGrpcTarget() + { + std::stringstream oss; + oss << "ipv4:"; + for (auto const& endPoint : m_gatewayInfo) + { + oss << endPoint.entryPoint() << ","; + } + return oss.str(); + } - std::string const& loadBalancePolicy() const { return m_loadBanlancePolicy; } - void setLoadBalancePolicy(std::string const& loadBanlancePolicy) + void setGrpcConfig(ppc::protocol::GrpcConfig::Ptr grpcConfig) { - m_loadBanlancePolicy = loadBanlancePolicy; + m_grpcConfig = std::move(grpcConfig); } + ppc::protocol::GrpcConfig::Ptr const& grpcConfig() const { return m_grpcConfig; } + + // generate the nodeInfo + virtual ppc::protocol::INodeInfo::Ptr generateNodeInfo() const = 0; + + virtual std::vector const& getComponents() const { return m_components; } + void setComponents(std::vector const& components) { m_components = components; } -private: - std::string m_loadBanlancePolicy = "round_robin"; +protected: + ppc::protocol::GrpcConfig::Ptr m_grpcConfig; ppc::protocol::EndPoint m_selfEndPoint; int m_threadPoolSize; std::string m_nodeID; std::vector m_gatewayInfo; + std::vector m_components; }; -class FrontConfigBuilder +inline std::string printFrontDesc(FrontConfig::Ptr const& config) { -public: - using Ptr = std::shared_ptr; - FrontConfigBuilder() = default; - virtual ~FrontConfigBuilder() = default; - - FrontConfig::Ptr build(int threadPoolSize, std::string nodeID) + if (!config) { - return std::make_shared(threadPoolSize, nodeID); + return "nullptr"; } -}; + std::ostringstream stringstream; + stringstream << LOG_KV("endPoint", config->selfEndPoint().entryPoint()) + << LOG_KV("nodeID", config->nodeID()) + << LOG_KV("poolSize", config->threadPoolSize()) + << LOG_KV("target", config->gatewayGrpcTarget()); + return stringstream.str(); +} } // namespace ppc::front \ No newline at end of file diff --git a/cpp/ppc-framework/front/FrontInterface.h b/cpp/ppc-framework/front/FrontInterface.h index e7648725..ff019e22 100644 --- a/cpp/ppc-framework/front/FrontInterface.h +++ b/cpp/ppc-framework/front/FrontInterface.h @@ -60,6 +60,8 @@ class FrontInterface virtual void asyncSendResponse(const std::string& _agencyID, std::string const& _uuid, front::PPCMessageFace::Ptr _message, ErrorCallbackFunc _callback) = 0; + virtual void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, + std::function _handler) = 0; /** * @brief notice task info to gateway * @param _taskInfo the latest task information @@ -69,9 +71,6 @@ class FrontInterface // erase the task-info when task finished virtual bcos::Error::Ptr eraseTaskInfo(std::string const& _taskID) = 0; - // get the agencyList from the gateway - virtual void asyncGetAgencyList(GetAgencyListCallback _callback) = 0; - virtual std::string const& selfEndPoint() const { return m_selfEndPoint; } protected: diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index b5f94a2d..8de9ab09 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -40,7 +40,7 @@ class IFrontClient virtual void onReceiveMessage( ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) = 0; }; -class IFront : public virtual IFrontClient +class IFront : virtual public IFrontClient { public: using Ptr = std::shared_ptr; @@ -132,13 +132,6 @@ class IFrontBuilder IFrontBuilder() = default; virtual ~IFrontBuilder() = default; - /** - * @brief create the Front using specified config - * - * @param config the config used to build the Front - * @return IFront::Ptr he created Front - */ - virtual IFront::Ptr build(ppc::front::FrontConfig::Ptr config) const = 0; virtual IFrontClient::Ptr buildClient(std::string endPoint) const = 0; }; } // namespace ppc::front \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/GrpcConfig.h b/cpp/ppc-framework/protocol/GrpcConfig.h new file mode 100644 index 00000000..f4b78814 --- /dev/null +++ b/cpp/ppc-framework/protocol/GrpcConfig.h @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GrpcConfig.h + * @author: yujiechen + * @date 2024-09-02 + */ +#pragma once +#include +#include + +namespace ppc::protocol +{ +class GrpcConfig +{ +public: + using Ptr = std::shared_ptr; + GrpcConfig() = default; + virtual ~GrpcConfig() = default; + + std::string const& loadBalancePolicy() const { return m_loadBanlancePolicy; } + void setLoadBalancePolicy(std::string const& loadBanlancePolicy) + { + m_loadBanlancePolicy = loadBanlancePolicy; + } + +private: + std::string m_loadBanlancePolicy = "round_robin"; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-framework/rpc/RpcStatusInterface.h b/cpp/ppc-framework/rpc/RpcStatusInterface.h index 441a08a2..5a5b18d2 100644 --- a/cpp/ppc-framework/rpc/RpcStatusInterface.h +++ b/cpp/ppc-framework/rpc/RpcStatusInterface.h @@ -31,7 +31,6 @@ namespace ppc::rpc { - class RpcStatusInterface { public: @@ -45,8 +44,6 @@ class RpcStatusInterface virtual bcos::Error::Ptr insertTask(protocol::Task::Ptr _task) = 0; virtual bcos::Error::Ptr updateTaskStatus(protocol::TaskResult::Ptr _taskResult) = 0; virtual protocol::TaskResult::Ptr getTaskStatus(const std::string& _taskID) = 0; - virtual bcos::Error::Ptr insertGateway( - const std::string& _agencyID, const std::string& _endpoint) = 0; virtual bcos::Error::Ptr deleteGateway(const std::string& _agencyID) = 0; virtual std::vector listGateway() = 0; }; diff --git a/cpp/ppc-framework/rpc/RpcTypeDef.h b/cpp/ppc-framework/rpc/RpcTypeDef.h index 3be4ee05..5a0fb493 100644 --- a/cpp/ppc-framework/rpc/RpcTypeDef.h +++ b/cpp/ppc-framework/rpc/RpcTypeDef.h @@ -36,7 +36,6 @@ enum class RpcError : int32_t std::string const RUN_TASK_METHOD = "runTask"; std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask"; std::string const GET_TASK_STATUS = "getTaskStatus"; -std::string const REGISTER_GATEWAY_URL = "registerGatewayUrl"; std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask"; std::string const FETCH_CIPHER = "fetchCipher"; diff --git a/cpp/test-utils/FakeFront.h b/cpp/test-utils/FakeFront.h index b7cdc97d..0b54380e 100644 --- a/cpp/test-utils/FakeFront.h +++ b/cpp/test-utils/FakeFront.h @@ -202,7 +202,6 @@ class FakeFront : public FrontInterface } } - void asyncGetAgencyList(GetAgencyListCallback) override {} private: // the uuid to _callback diff --git a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h index 41f83430..7ee7dd80 100644 --- a/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h +++ b/cpp/wedpr-computing/ppc-psi/src/PSIConfig.h @@ -114,16 +114,6 @@ class PSIConfig int taskExpireTime() const { return m_taskExpireTime; } void setTaskExpireTime(int _taskExpireTime) { m_taskExpireTime = _taskExpireTime; } - void updateAgenyList(std::vector const& _agencyList) - { - bcos::UpgradableGuard l(x_agencyList); - if (m_agencyList != _agencyList) - { - bcos::UpgradeGuard ul(l); - m_agencyList = _agencyList; - } - } - std::vector agencyList() const { bcos::ReadGuard l(x_agencyList); @@ -165,6 +155,7 @@ class PSIConfig int m_taskExpireTime = 10000; // the agency list, for task-sync + // TODO: fetch from the gateway std::vector m_agencyList; mutable bcos::SharedMutex x_agencyList; }; diff --git a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h index 76aac8b0..2e695a89 100644 --- a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h +++ b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h @@ -22,6 +22,7 @@ #include "Common.h" #include "NetworkConfig.h" #include "ParamChecker.h" +#include "ppc-framework/front/FrontConfig.h" #include "ppc-framework/storage/CacheStorage.h" #include #include @@ -244,6 +245,10 @@ class PPCConfig int holdingMessageMinutes() const { return m_holdingMessageMinutes; } + ppc::front::FrontConfig::Ptr const& frontConfig() const { return m_frontConfig; } + + ppc::protocol::GrpcConfig::Ptr const& grpcConfig() const { return m_grpcConfig; } + private: virtual void loadRA2018Config(boost::property_tree::ptree const& _pt); virtual void loadEcdhPSIConfig(boost::property_tree::ptree const& _pt); @@ -283,6 +288,11 @@ class PPCConfig // the gateway holding message time, in minutes, default 30min int m_holdingMessageMinutes = 30; + // the front config + // TODO: parse the frontConfig + ppc::front::FrontConfig::Ptr m_frontConfig; + ppc::protocol::GrpcConfig::Ptr m_grpcConfig; + // the ra2018-psi config RA2018Config m_ra2018PSIConfig; // the storage config diff --git a/cpp/wedpr-initializer/CMakeLists.txt b/cpp/wedpr-initializer/CMakeLists.txt index 7fec7d12..633aaf0b 100644 --- a/cpp/wedpr-initializer/CMakeLists.txt +++ b/cpp/wedpr-initializer/CMakeLists.txt @@ -1,4 +1,7 @@ file(GLOB SRC_LIST "*.cpp") add_library(${INIT_LIB} ${SRC_LIST}) -target_link_libraries(${INIT_LIB} PUBLIC ${PROTOCOL_TARGET} ${CRYPTO_TARGET} ${FRONT_TARGET} ${LABELED_PSI_TARGET} ${RA2018_PSI_TARGET} ${CM2020_PSI_TARGET} ${ECDH_2PC_PSI_TARGET} ${ECDH_MULTI_PSI_TARGET} ${ECDH_CONN_PSI_TARGET} ${BS_ECDH_PSI_TARGET} ${PIR_TARGET} ${IO_TARGET} ${STORAGE_TARGET}) +target_link_libraries(${INIT_LIB} PUBLIC + ${PROTOCOL_TARGET} ${CRYPTO_TARGET} ${WEDPR_TRANSPORT_SDK_TARGET} ${LABELED_PSI_TARGET} + ${RA2018_PSI_TARGET} ${CM2020_PSI_TARGET} ${ECDH_2PC_PSI_TARGET} + ${ECDH_MULTI_PSI_TARGET} ${ECDH_CONN_PSI_TARGET} ${BS_ECDH_PSI_TARGET} ${PIR_TARGET} ${IO_TARGET} ${STORAGE_TARGET}) diff --git a/cpp/wedpr-initializer/FrontInitializer.cpp b/cpp/wedpr-initializer/FrontInitializer.cpp deleted file mode 100644 index 6cbd6702..00000000 --- a/cpp/wedpr-initializer/FrontInitializer.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright (C) 2022 WeDPR. - * SPDX-License-Identifier: Apache-2.0 - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * @file FrontInitializer.cpp - * @author: shawnhe - * @date 2022-10-20 - */ - -#include "FrontInitializer.h" - -using namespace ppctars; -using namespace ppc; -using namespace ppc::front; -using namespace ppc::protocol; -using namespace ppc::initializer; - -void FrontInitializer::start() -{ - if (!m_running.exchange(true)) - { - FRONT_LOG(INFO) << LOG_DESC("start the front"); - if (m_front) - { - m_front->start(); - } - if (m_gatewayReporter) - { - m_gatewayReporter->registerTimeoutHandler( - boost::bind(&FrontInitializer::reportOtherGateway, this)); - m_gatewayReporter->start(); - } - FRONT_LOG(INFO) << LOG_DESC("start the front success"); - } -} - -void FrontInitializer::stop() -{ - if (m_running.exchange(false)) - { - FRONT_LOG(INFO) << LOG_DESC("stop the front"); - if (m_front) - { - m_front->stop(); - } - if (m_gatewayReporter) - { - m_gatewayReporter->stop(); - } - FRONT_LOG(INFO) << LOG_DESC("stop the front success"); - } -} - -void FrontInitializer::init() -{ - FRONT_LOG(INFO) << LOG_BADGE("init front"); - - auto ioService = std::make_shared(); - // init front - m_frontFactory = std::make_shared(m_selfAgencyId, m_threadPool); - m_front = m_frontFactory->buildFront(ioService); - FRONT_LOG(INFO) << LOG_BADGE("init front success"); -} diff --git a/cpp/wedpr-initializer/FrontInitializer.h b/cpp/wedpr-initializer/FrontInitializer.h deleted file mode 100644 index f00491e2..00000000 --- a/cpp/wedpr-initializer/FrontInitializer.h +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright (C) 2022 WeDPR. - * SPDX-License-Identifier: Apache-2.0 - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * @file FrontInitializer.h - * @author: shawnhe - * @date 2022-10-20 - */ - -#pragma once - -#include "ppc-framework/front/Channel.h" -#include "ppc-framework/front/FrontInterface.h" -#include "ppc-framework/protocol/PPCMessageFace.h" -#include "ppc-framework/rpc/RpcStatusInterface.h" -#include "ppc-front/ppc-front/Front.h" -#include "ppc-front/ppc-front/PPCChannelManager.h" -#include -#include - -namespace ppc -{ -namespace initializer -{ -class FrontInitializer -{ -public: - using Ptr = std::shared_ptr; - - explicit FrontInitializer(const std::string& _selfAgencyId, - std::shared_ptr _threadPool, - front::PPCMessageFaceFactory::Ptr _messageFactory) - : m_selfAgencyId(_selfAgencyId), - m_threadPool(std::move(_threadPool)), - m_messageFactory(std::move(_messageFactory)), - m_gatewayReporter(std::make_shared(3000, "gatewayReporter")) - { - init(); - } - - virtual ~FrontInitializer() { stop(); } - - virtual void start(); - virtual void stop(); - - const std::string& selfAgencyId() const { return m_selfAgencyId; } - std::shared_ptr threadPool() { return m_threadPool; } - - front::Front::Ptr front() { return m_front; } - - front::FrontFactory::Ptr frontFactory() { return m_frontFactory; } - - front::PPCMessageFaceFactory::Ptr messageFactory() { return m_messageFactory; } - - void setRpcStatus(rpc::RpcStatusInterface::Ptr _status) { m_rpcStatus = std::move(_status); } - -protected: - void init(); - // report the gateway endpoints of other parties to the gateway periodically - // Note: must set gatewayInterface into the front before call this function - // Note: must set RpcStorageInterface into the FrontInitializer before call this function - virtual void reportOtherGateway() - { - try - { - auto gatewayList = m_rpcStatus->listGateway(); - if (!gatewayList.empty()) - { - m_front->gatewayInterface()->registerGateway(gatewayList); - } - } - catch (std::exception const& e) - { - FRONT_LOG(WARNING) << LOG_DESC("reportOtherGateway exception") - << LOG_KV("exception", boost::diagnostic_information(e)); - } - m_gatewayReporter->restart(); - } - -protected: - std::string m_selfAgencyId; - std::shared_ptr m_threadPool; - front::PPCMessageFaceFactory::Ptr m_messageFactory; - - front::Front::Ptr m_front; - front::FrontFactory::Ptr m_frontFactory; - - std::atomic_bool m_running = {false}; - std::shared_ptr m_gatewayReporter; - rpc::RpcStatusInterface::Ptr m_rpcStatus; -}; -} // namespace initializer -} // namespace ppc \ No newline at end of file diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index cb5d68f7..0bd52833 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -19,7 +19,6 @@ */ #include "Initializer.h" #include "Common.h" -#include "ProFrontInitializer.h" #include "ppc-crypto/src/ecc/ECDHCryptoFactoryImpl.h" #include "ppc-crypto/src/oprf/RA2018Oprf.h" #include "ppc-framework/protocol/Protocol.h" @@ -28,10 +27,12 @@ #include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIFactory.h" #include "ppc-psi/src/cm2020-psi/CM2020PSIFactory.h" #include "protocol/src/PPCMessage.h" +#include "wedpr-transport/sdk/TransportBuilder.h" #if 0 //TODO: optimize here #include "ppc-psi/src/ecdh-conn-psi/EcdhConnPSIFactory.h" #endif +#include "ppc-front/Front.h" #include "ppc-psi/src/ecdh-multi-psi/EcdhMultiPSIFactory.h" #include "ppc-psi/src/ecdh-psi/EcdhPSIFactory.h" #include "ppc-psi/src/labeled-psi/LabeledPSIFactory.h" @@ -46,6 +47,7 @@ using namespace ppc::psi; using namespace ppc::pir; using namespace ppc::tools; using namespace ppc::crypto; +using namespace ppc::sdk; Initializer::Initializer(std::string const& _configPath) : m_configPath(_configPath) { @@ -54,7 +56,7 @@ Initializer::Initializer(std::string const& _configPath) : m_configPath(_configP m_config->loadConfig(_configPath); } -void Initializer::init(ppc::protocol::NodeArch _arch) +void Initializer::init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Ptr const& gateway) { INIT_LOG(INFO) << LOG_DESC("init the wedpr-component") << LOG_KV("arch", _arch); // load the protocol @@ -67,31 +69,20 @@ void Initializer::init(ppc::protocol::NodeArch _arch) auto frontThreadPool = std::make_shared("front", m_config->threadPoolSize()); // Note: must set the m_holdingMessageMinutes before init the node + TransportBuilder transportBuilder; if (_arch == ppc::protocol::NodeArch::AIR) { - m_frontInitializer = std::make_shared( - m_config->agencyID(), frontThreadPool, m_protocolInitializer->ppcMsgFactory()); - // load the gateway config - m_config->loadGatewayConfig(ppc::protocol::NodeArch::AIR, nullptr, m_configPath); + m_transport = transportBuilder.build(SDKMode::AIR, m_config->frontConfig(), gateway); } else { - m_frontInitializer = std::make_shared( - m_config->agencyID(), frontThreadPool, m_protocolInitializer->ppcMsgFactory()); - m_agencyInfoFetcher = std::make_shared(3000, "agencyInfoFetcher"); - // Note: the timer start work only after calling start - auto self = weak_from_this(); - m_agencyInfoFetcher->registerTimeoutHandler([self]() { - auto init = self.lock(); - if (!init) - { - return; - } - init->fetchAgencyListPeriodically(); - }); + m_transport = transportBuilder.build(SDKMode::PRO, m_config->frontConfig(), nullptr); } + m_ppcFront = std::make_shared(m_transport->getFront()); + INIT_LOG(INFO) << LOG_DESC("init the frontService success") - << LOG_KV("agency", m_config->agencyID()) << LOG_KV("arch", _arch); + << LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig())) + << LOG_KV("arch", _arch); auto cryptoBox = m_protocolInitializer->cryptoBox(); SQLStorage::Ptr sqlStorage = nullptr; @@ -123,9 +114,9 @@ void Initializer::init(ppc::protocol::NodeArch _arch) auto ra2018PSIFactory = std::make_shared(); auto oprf = std::make_shared( m_config->privateKey(), cryptoBox->eccCrypto(), cryptoBox->hashImpl()); - m_ra2018PSI = ra2018PSIFactory->createRA2018PSI(m_config->agencyID(), - m_frontInitializer->front(), m_config, oprf, m_protocolInitializer->binHashImpl(), - m_protocolInitializer->ppcMsgFactory(), sqlStorage, fileStorage, + m_ra2018PSI = ra2018PSIFactory->createRA2018PSI(m_config->agencyID(), m_ppcFront, m_config, + oprf, m_protocolInitializer->binHashImpl(), m_protocolInitializer->ppcMsgFactory(), + sqlStorage, fileStorage, std::make_shared("ra2018", m_config->threadPoolSize()), m_protocolInitializer->dataResourceLoader()); INIT_LOG(INFO) << LOG_DESC("init the ra2018-psi success"); @@ -133,8 +124,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch) // init the labeled-psi INIT_LOG(INFO) << LOG_DESC("init the labeled-psi"); auto labeledPSIFactory = std::make_shared(); - m_labeledPSI = labeledPSIFactory->buildLabeledPSI(m_config->agencyID(), - m_frontInitializer->front(), cryptoBox, + m_labeledPSI = labeledPSIFactory->buildLabeledPSI(m_config->agencyID(), m_ppcFront, cryptoBox, std::make_shared("t_labeled-psi", m_config->threadPoolSize()), m_protocolInitializer->dataResourceLoader(), m_config->holdingMessageMinutes()); INIT_LOG(INFO) << LOG_DESC("init the labeled-psi success"); @@ -142,8 +132,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch) // init the cm2020-psi INIT_LOG(INFO) << LOG_DESC("init the cm2020-psi"); auto cm2020PSIFactory = std::make_shared(); - m_cm2020PSI = cm2020PSIFactory->buildCM2020PSI(m_config->agencyID(), - m_frontInitializer->front(), cryptoBox, + m_cm2020PSI = cm2020PSIFactory->buildCM2020PSI(m_config->agencyID(), m_ppcFront, cryptoBox, std::make_shared("t_cm2020-psi", m_config->threadPoolSize()), m_protocolInitializer->dataResourceLoader(), m_config->holdingMessageMinutes(), m_config->cm2020PSIConfig().parallelism); @@ -153,8 +142,8 @@ void Initializer::init(ppc::protocol::NodeArch _arch) INIT_LOG(INFO) << LOG_DESC("create ecdh-psi"); auto ecdhPSIFactory = std::make_shared(); auto ecdhCryptoFactory = std::make_shared(m_config->privateKey()); - m_ecdhPSI = ecdhPSIFactory->createEcdhPSI(m_config, ecdhCryptoFactory, - m_frontInitializer->front(), m_protocolInitializer->ppcMsgFactory(), nullptr, + m_ecdhPSI = ecdhPSIFactory->createEcdhPSI(m_config, ecdhCryptoFactory, m_ppcFront, + m_protocolInitializer->ppcMsgFactory(), nullptr, m_protocolInitializer->dataResourceLoader()); INIT_LOG(INFO) << LOG_DESC("create ecdh-psi success"); @@ -164,7 +153,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch) INIT_LOG(INFO) << LOG_DESC("create ecdh-conn-psi"); auto ecdhConnPSIFactory = std::make_shared(); m_ecdhConnPSI = ecdhConnPSIFactory->createEcdhConnPSI(m_config, ecdhCryptoFactory, - m_frontInitializer->front(), m_protocolInitializer->ppcMsgFactory(), + m_ppcFront, m_protocolInitializer->ppcMsgFactory(), std::make_shared("t_ecdh-conn-psi", std::thread::hardware_concurrency()), m_protocolInitializer->dataResourceLoader()); INIT_LOG(INFO) << LOG_DESC("create ecdh-conn-psi success"); @@ -172,8 +161,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch) // init the ecdh-multi-psi INIT_LOG(INFO) << LOG_DESC("create ecdh-multi-psi"); auto ecdhMultiPSIFactory = std::make_shared(); - m_ecdhMultiPSI = ecdhMultiPSIFactory->createEcdhMultiPSI(m_config, m_frontInitializer->front(), - cryptoBox, + m_ecdhMultiPSI = ecdhMultiPSIFactory->createEcdhMultiPSI(m_config, m_ppcFront, cryptoBox, std::make_shared("t_ecdh-multi-psi", std::thread::hardware_concurrency()), m_protocolInitializer->dataResourceLoader()); INIT_LOG(INFO) << LOG_DESC("create ecdh-multi-psi success"); @@ -181,7 +169,7 @@ void Initializer::init(ppc::protocol::NodeArch _arch) // init the ot-pir INIT_LOG(INFO) << LOG_DESC("create ot-pir"); auto otPIRFactory = std::make_shared(); - m_otPIR = otPIRFactory->buildOtPIR(m_config->agencyID(), m_frontInitializer->front(), cryptoBox, + m_otPIR = otPIRFactory->buildOtPIR(m_config->agencyID(), m_ppcFront, cryptoBox, std::make_shared("t_ot-pir", std::thread::hardware_concurrency()), m_protocolInitializer->dataResourceLoader(), m_config->holdingMessageMinutes()); @@ -208,7 +196,7 @@ void Initializer::initMsgHandlers() INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for ra2018PSI"); // register msg-handlers for ra2018-psi auto weakRA2018PSI = std::weak_ptr(m_ra2018PSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::RA_PSI_2PC, [weakRA2018PSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakRA2018PSI.lock(); @@ -222,7 +210,7 @@ void Initializer::initMsgHandlers() // register msg-handlers for labeled-psi INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for labeledPSI"); auto weakLabeledPSI = std::weak_ptr(m_labeledPSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::LABELED_PSI_2PC, [weakLabeledPSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakLabeledPSI.lock(); @@ -236,7 +224,7 @@ void Initializer::initMsgHandlers() // register msg-handlers for cm2020-psi INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for CM2020PSI"); auto weakCM2020PSI = std::weak_ptr(m_cm2020PSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::CM_PSI_2PC, [weakCM2020PSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakCM2020PSI.lock(); @@ -250,7 +238,7 @@ void Initializer::initMsgHandlers() INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for ecdh-psi"); // register msg-handlers for ecdh-psi auto weakEcdhPSI = std::weak_ptr(m_ecdhPSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::ECDH_PSI_2PC, [weakEcdhPSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakEcdhPSI.lock(); @@ -264,7 +252,7 @@ void Initializer::initMsgHandlers() // register msg-handlers for ecdh-conn-psi /*INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for ecdh-conn-psi"); auto weakEcdhConnPSI = std::weak_ptr(m_ecdhConnPSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::ECDH_PSI_CONN, [weakEcdhConnPSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakEcdhConnPSI.lock(); @@ -278,7 +266,7 @@ void Initializer::initMsgHandlers() // register msg-handlers for ecdh-multi-psi INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for ecdh-multi-psi"); auto weakEcdhMultiPSI = std::weak_ptr(m_ecdhMultiPSI); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PSI, (uint8_t)ppc::protocol::PSIAlgorithmType::ECDH_PSI_MULTI, [weakEcdhMultiPSI](ppc::front::PPCMessageFace::Ptr _msg) { auto psi = weakEcdhMultiPSI.lock(); @@ -292,7 +280,7 @@ void Initializer::initMsgHandlers() INIT_LOG(INFO) << LOG_DESC("initMsgHandlers for ot-pir"); // register msg-handlers for ecdh-psi auto weakOtPIR = std::weak_ptr(m_otPIR); - m_frontInitializer->front()->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PIR, + m_ppcFront->registerMessageHandler((uint8_t)ppc::protocol::TaskType::PIR, (uint8_t)ppc::protocol::PSIAlgorithmType::OT_PIR_2PC, [weakOtPIR](ppc::front::PPCMessageFace::Ptr _msg) { auto pir = weakOtPIR.lock(); @@ -414,13 +402,9 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc) void Initializer::start() { - if (m_agencyInfoFetcher) - { - m_agencyInfoFetcher->start(); - } - if (m_frontInitializer) + if (m_transport) { - m_frontInitializer->start(); + m_transport->start(); } /*if (m_ecdhConnPSI) { @@ -460,14 +444,10 @@ void Initializer::start() void Initializer::stop() { - if (m_agencyInfoFetcher) - { - m_agencyInfoFetcher->stop(); - } // stop the network firstly - if (m_frontInitializer) + if (m_transport) { - m_frontInitializer->stop(); + m_transport->stop(); } /*if (m_ecdhConnPSI) { @@ -481,14 +461,14 @@ void Initializer::stop() { m_cm2020PSI->stop(); } - // if (m_ra2018PSI) - // { - // m_ra2018PSI->stop(); - // } - // if (m_labeledPSI) - // { - // m_labeledPSI->stop(); - // } + if (m_ra2018PSI) + { + m_ra2018PSI->stop(); + } + if (m_labeledPSI) + { + m_labeledPSI->stop(); + } // stop the ecdh-psi if (m_ecdhPSI) { @@ -503,40 +483,3 @@ void Initializer::stop() m_bsEcdhPSI->stop(); } } - -void Initializer::fetchAgencyListPeriodically() -{ - if (!m_agencyInfoFetcher) - { - return; - } - fetchAgencyList(); - m_agencyInfoFetcher->restart(); -} - -// fetch the agency-list for ecdh and ra2018 periodically in pro-mode -void Initializer::fetchAgencyList() -{ - auto weakEcdhPSI = std::weak_ptr(m_ecdhPSI); - auto weakRA2018PSI = std::weak_ptr(m_ra2018PSI); - m_frontInitializer->front()->asyncGetAgencyList( - [weakEcdhPSI, weakRA2018PSI]( - bcos::Error::Ptr _error, std::vector&& _agencyList) { - if (_error) - { - INIT_LOG(INFO) << LOG_DESC("asyncGetAgencyList failed") - << LOG_KV("code", _error->errorCode()); - return; - } - auto ecdhPsi = weakEcdhPSI.lock(); - if (ecdhPsi) - { - ecdhPsi->psiConfig()->updateAgenyList(_agencyList); - } - auto ra2018Psi = weakRA2018PSI.lock(); - if (ra2018Psi) - { - ra2018Psi->psiConfig()->updateAgenyList(_agencyList); - } - }); -} \ No newline at end of file diff --git a/cpp/wedpr-initializer/Initializer.h b/cpp/wedpr-initializer/Initializer.h index e5424733..78eba67d 100644 --- a/cpp/wedpr-initializer/Initializer.h +++ b/cpp/wedpr-initializer/Initializer.h @@ -18,15 +18,18 @@ * @date 2022-11-14 */ #pragma once -#include "FrontInitializer.h" #include "ProtocolInitializer.h" +#include "ppc-framework/front/FrontInterface.h" +#include "ppc-framework/gateway/IGateway.h" #include "ppc-framework/rpc/RpcInterface.h" #include "ppc-framework/rpc/RpcTypeDef.h" #include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIImpl.h" #include "ppc-tools/src/config/PPCConfig.h" +#include "wedpr-transport/sdk/Transport.h" #include #include + namespace ppc::psi { class RA2018PSIImpl; @@ -51,31 +54,31 @@ class Initializer : public std::enable_shared_from_this virtual ~Initializer() { stop(); } // init the service - virtual void init(ppc::protocol::NodeArch _arch); + virtual void init(ppc::protocol::NodeArch _arch, ppc::gateway::IGateway::Ptr const& gateway); virtual void stop(); virtual void start(); ppc::tools::PPCConfig::Ptr config() { return m_config; } - FrontInitializer::Ptr const& frontInitializer() const { return m_frontInitializer; } + ppc::sdk::Transport::Ptr const& transport() const { return m_transport; } + ppc::front::FrontInterface::Ptr const& ppcFront() const { return m_ppcFront; } + ppc::tools::PPCConfig::Ptr const& config() const { return m_config; } ProtocolInitializer::Ptr const& protocolInitializer() const { return m_protocolInitializer; } ppc::psi::BsEcdhPSIImpl::Ptr const& bsEcdhPsi() const { return m_bsEcdhPSI; } void registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc); - virtual void fetchAgencyList(); - protected: virtual void initMsgHandlers(); - void fetchAgencyListPeriodically(); + private: std::string m_configPath; ppc::tools::PPCConfig::Ptr m_config; ProtocolInitializer::Ptr m_protocolInitializer; - FrontInitializer::Ptr m_frontInitializer; - // timer to fetch all agency information - std::shared_ptr m_agencyInfoFetcher; + ppc::sdk::Transport::Ptr m_transport; + // created with transport + ppc::front::FrontInterface::Ptr m_ppcFront; // the ra2018-psi std::shared_ptr m_ra2018PSI; diff --git a/cpp/wedpr-initializer/ProFrontInitializer.h b/cpp/wedpr-initializer/ProFrontInitializer.h deleted file mode 100644 index 8355d968..00000000 --- a/cpp/wedpr-initializer/ProFrontInitializer.h +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2022 WeDPR. - * SPDX-License-Identifier: Apache-2.0 - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * @file ProFrontInitializer.h - * @author: yujiechen - * @date 2022-11-25 - */ -#pragma once -#include "FrontInitializer.h" -#include "ppc-framework/gateway/GatewayInterface.h" -#include - -namespace ppc::initializer -{ -class ProFrontInitializer : public FrontInitializer -{ -public: - using Ptr = std::shared_ptr; - ProFrontInitializer(const std::string& _selfAgencyId, - std::shared_ptr _threadPool, - front::PPCMessageFaceFactory::Ptr _messageFactory) - : FrontInitializer(_selfAgencyId, _threadPool, _messageFactory), - m_statusReporter(std::make_shared(3000, "frontReporter")) - {} - ~ProFrontInitializer() override { stop(); } - - void start() override - { - FrontInitializer::start(); - if (m_statusReporter) - { - m_statusReporter->registerTimeoutHandler( - boost::bind(&ProFrontInitializer::reportStatusToGateway, this)); - m_statusReporter->start(); - } - } - - void stop() override - { - if (m_statusReporter) - { - m_statusReporter->stop(); - } - FrontInitializer::stop(); - } - -protected: - // report the endPoint to the gateway periodically - // Note: must set gatewayInterface into the front before call this function - virtual void reportStatusToGateway() - { - try - { - m_front->gatewayInterface()->registerFront(m_front->selfEndPoint(), nullptr); - } - catch (std::exception const& e) - { - FRONT_LOG(WARNING) << LOG_DESC("reportStatusToGateway exception") - << LOG_KV("exception", boost::diagnostic_information(e)); - } - m_statusReporter->restart(); - } - -private: - std::shared_ptr m_statusReporter; -}; -} // namespace ppc::initializer diff --git a/cpp/wedpr-main/CMakeLists.txt b/cpp/wedpr-main/CMakeLists.txt index f73c486d..ba80ab5f 100644 --- a/cpp/wedpr-main/CMakeLists.txt +++ b/cpp/wedpr-main/CMakeLists.txt @@ -2,33 +2,12 @@ project(ppc-main VERSION ${VERSION}) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) add_subdirectory(air-node) +add_subdirectory(pro-node) +#TODO: ppc-gateway if (BUILD_CEM) add_subdirectory(cem-node) endif () add_subdirectory(mpc-node) -macro(compile_service SERVICE_SOURCE_PATH BINARY_NAME PKG_NAME) - add_subdirectory(${SERVICE_SOURCE_PATH}) - add_custom_command(OUTPUT ${PKG_NAME}.tgz - WORKING_DIRECTORY ${CMAKE_BINARY_DIR} - COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_BINARY_DIR}/ppc-main/${SERVICE_SOURCE_PATH}/${BINARY_NAME} ${CMAKE_BINARY_DIR}/${PKG_NAME}/${PKG_NAME} - COMMAND ${CMAKE_COMMAND} -E chdir ${CMAKE_BINARY_DIR} tar czfv ${PKG_NAME}.tgz ${PKG_NAME}/${PKG_NAME} - COMMAND ${CMAKE_COMMAND} -E copy ${PKG_NAME}.tgz tars/${PKG_NAME}.tgz - COMMENT "Compressing ${BINARY_NAME} into ${PKG_NAME}.tgz ..." - COMMAND ${CMAKE_COMMAND} -E rm ${PKG_NAME}.tgz - ) - add_custom_target(${PKG_NAME}-tar DEPENDS ${PKG_NAME}.tgz ${BINARY_NAME}) - list(APPEND SERVICE_TAR_LIST ${PKG_NAME}-tar) -endmacro() - -# gateway -# Note: the tar-pkg is only useful when using tars-admin -compile_service("gateway" ${GATEWAY_BINARY_NAME} ${GATEWAY_PKG_NAME}) -# ppc-node -compile_service("pro-node" ${PRO_BINARY_NAME} ${PPC_NODE_PKG_NAME}) - -# for make tar -add_custom_target(tar DEPENDS ${SERVICE_TAR_LIST}) - -unset(CMAKE_RUNTIME_OUTPUT_DIRECTORY) +unset(CMAKE_RUNTIME_OUTPUT_DIRECTORY) \ No newline at end of file diff --git a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp index f33bd44b..82ee59ac 100644 --- a/cpp/wedpr-main/air-node/AirNodeInitializer.cpp +++ b/cpp/wedpr-main/air-node/AirNodeInitializer.cpp @@ -18,13 +18,13 @@ * @date 2022-11-14 */ #include "AirNodeInitializer.h" +#include "ppc-front/LocalFrontBuilder.h" #include "ppc-gateway/GatewayFactory.h" -#include "ppc-gateway/GatewayConfigContext.h" #include "ppc-rpc/src/RpcFactory.h" #include "ppc-rpc/src/RpcMemory.h" -#include "ppc-storage/src/redis/RedisStorage.h" using namespace ppc::protocol; +using namespace ppc::front; using namespace ppc::node; using namespace ppc::gateway; using namespace ppc::rpc; @@ -32,6 +32,10 @@ using namespace ppc::storage; using namespace ppc::initializer; using namespace bcos; +AirNodeInitializer::AirNodeInitializer() +{ + m_frontBuilder = std::make_shared(); +} void AirNodeInitializer::init(std::string const& _configPath) { // init the log @@ -44,12 +48,15 @@ void AirNodeInitializer::init(std::string const& _configPath) // init the node m_nodeInitializer = std::make_shared(_configPath); - m_nodeInitializer->init(ppc::protocol::NodeArch::AIR); - auto front = m_nodeInitializer->frontInitializer()->front(); - front->setSelfEndPoint("localhost"); // init the gateway initGateway(_configPath); + // init the node + m_nodeInitializer->init(ppc::protocol::NodeArch::AIR, m_gateway); + // set the created front to the builder + m_frontBuilder->setFront(m_nodeInitializer->transport()->getFront()); + // register the NodeInfo + m_gateway->registerNodeInfo(m_nodeInitializer->config()->frontConfig()->generateNodeInfo()); INIT_LOG(INFO) << LOG_DESC("init the rpc"); // load the rpc config @@ -58,15 +65,13 @@ void AirNodeInitializer::init(std::string const& _configPath) // init RpcStatusInterface RpcStatusInterface::Ptr rpcStatusInterface = std::make_shared(); - m_nodeInitializer->frontInitializer()->setRpcStatus(rpcStatusInterface); + auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); m_rpc->setRpcStorage(rpcStatusInterface); m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); m_nodeInitializer->registerRpcHandler(m_rpc); - // Note: only can fetchAgencyList after the gatewayInterface has been setted into front - m_nodeInitializer->fetchAgencyList(); INIT_LOG(INFO) << LOG_DESC("init the rpc success"); } @@ -82,15 +87,6 @@ void AirNodeInitializer::initGateway(std::string const& _configPath) GatewayFactory gatewayFactory(config); m_gateway = gatewayFactory.build(m_frontBuilder); - // TODO: register the nodeInfo - //m_gateway->registerNodeInfo(); - // ppc::front::IFrontBuilder::Ptr const& frontBuilder - // registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) - /*auto gateway = gatewayFactory.buildGateway(ppc::protocol::NodeArch::AIR, config, nullptr, - m_nodeInitializer->protocolInitializer()->ppcMsgFactory(), threadPool); - auto frontInitializer = m_nodeInitializer->frontInitializer(); - gateway->registerFront(frontInitializer->front()->selfEndPoint(), frontInitializer->front());*/ - m_gateway = gateway; } void AirNodeInitializer::start() diff --git a/cpp/wedpr-main/air-node/AirNodeInitializer.h b/cpp/wedpr-main/air-node/AirNodeInitializer.h index 218f8343..0366a179 100644 --- a/cpp/wedpr-main/air-node/AirNodeInitializer.h +++ b/cpp/wedpr-main/air-node/AirNodeInitializer.h @@ -18,23 +18,27 @@ * @date 2022-11-14 */ #pragma once -#include "libinitializer/Common.h" -#include "libinitializer/Initializer.h" #include "ppc-framework/front/IFront.h" #include "ppc-framework/gateway/IGateway.h" +#include "wedpr-initializer/Common.h" +#include "wedpr-initializer/Initializer.h" #include #include namespace ppc::rpc { class Rpc; } +namespace ppc::front +{ +class LocalFrontBuilder; +} namespace ppc::node { class AirNodeInitializer { public: using Ptr = std::shared_ptr; - AirNodeInitializer() {} + AirNodeInitializer(); virtual ~AirNodeInitializer() { stop(); } virtual void init(std::string const& _configPath); @@ -49,6 +53,6 @@ class AirNodeInitializer ppc::initializer::Initializer::Ptr m_nodeInitializer; ppc::gateway::IGateway::Ptr m_gateway; std::shared_ptr m_rpc; - ppc::front::IFrontBuilder::Ptr m_frontBuilder; + std::shared_ptr m_frontBuilder; }; } // namespace ppc::node \ No newline at end of file diff --git a/cpp/wedpr-main/air-node/main.cpp b/cpp/wedpr-main/air-node/main.cpp index 884f5298..b7cfd67f 100644 --- a/cpp/wedpr-main/air-node/main.cpp +++ b/cpp/wedpr-main/air-node/main.cpp @@ -18,60 +18,10 @@ * @date 2022-11-14 */ #include "AirNodeInitializer.h" -#include -#include -#include -#include -#include +#include "wedpr-main/common/NodeStarter.h" -using namespace ppc; int main(int argc, const char* argv[]) { - /// set LC_ALL - setDefaultOrCLocale(); - std::set_terminate([]() { - std::cerr << "terminate handler called, print stacks" << std::endl; - void* trace_elems[20]; - int trace_elem_count(backtrace(trace_elems, 20)); - char** stack_syms(backtrace_symbols(trace_elems, trace_elem_count)); - for (int i = 0; i < trace_elem_count; ++i) - { - std::cout << stack_syms[i] << "\n"; - } - free(stack_syms); - std::cerr << "terminate handler called, print stack end" << std::endl; - abort(); - }); - // get datetime and output welcome info - ExitHandler exitHandler; - signal(SIGTERM, &ExitHandler::exitHandler); - signal(SIGABRT, &ExitHandler::exitHandler); - signal(SIGINT, &ExitHandler::exitHandler); - - // Note: the initializer must exist in the life time of the whole program - auto initializer = std::make_shared(); - try - { - auto param = initCommandLine(argc, argv); - initializer->init(param.configFilePath); - initializer->start(); - } - catch (std::exception const& e) - { - printVersion(); - std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "start ppc-psi failed, error:" << boost::diagnostic_information(e) - << std::endl; - return -1; - } - printVersion(); - std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "The ppc-psi is running..." << std::endl; - while (!exitHandler.shouldExit()) - { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } - initializer.reset(); - std::cout << "[" << bcos::getCurrentDateTime() << "] "; - std::cout << "ppc-psi program exit normally." << std::endl; -} + auto initializer = std::make_shared(); + startProgram(argc, argv, "ppc-psi", initializer); +} \ No newline at end of file diff --git a/cpp/wedpr-main/cem-node/CEMInitializer.h b/cpp/wedpr-main/cem-node/CEMInitializer.h index 149e9872..1ba25899 100644 --- a/cpp/wedpr-main/cem-node/CEMInitializer.h +++ b/cpp/wedpr-main/cem-node/CEMInitializer.h @@ -18,8 +18,8 @@ * @date 2022-11-19 */ #pragma once -#include "libinitializer/Common.h" #include "ppc-rpc/src/RpcFactory.h" +#include "wedpr-initializer/Common.h" #include #include namespace ppc::rpc diff --git a/cpp/wedpr-main/common/NodeStarter.h b/cpp/wedpr-main/common/NodeStarter.h new file mode 100644 index 00000000..95d8d671 --- /dev/null +++ b/cpp/wedpr-main/common/NodeStarter.h @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file main.cpp + * @author: yujiechen + * @date 2022-11-14 + */ + +#include +#include +#include +#include +#include + +namespace ppc::node +{ +template +int startProgram( + int argc, const char* argv[], std::string const& binaryName, std::shared_ptr& starter) +{ + /// set LC_ALL + setDefaultOrCLocale(); + std::set_terminate([]() { + std::cerr << "terminate handler called, print stacks" << std::endl; + void* trace_elems[20]; + int trace_elem_count(backtrace(trace_elems, 20)); + char** stack_syms(backtrace_symbols(trace_elems, trace_elem_count)); + for (int i = 0; i < trace_elem_count; ++i) + { + std::cout << stack_syms[i] << "\n"; + } + free(stack_syms); + std::cerr << "terminate handler called, print stack end" << std::endl; + abort(); + }); + // get datetime and output welcome info + ppc::ExitHandler exitHandler; + signal(SIGTERM, &ppc::ExitHandler::exitHandler); + signal(SIGABRT, &ppc::ExitHandler::exitHandler); + signal(SIGINT, &ppc::ExitHandler::exitHandler); + + // Note: the initializer must exist in the life time of the whole program + try + { + auto param = ppc::initCommandLine(argc, argv); + starter->init(param.configFilePath); + starter->start(); + } + catch (std::exception const& e) + { + printVersion(); + std::cout << "[" << bcos::getCurrentDateTime() << "] "; + std::cout << "start " + binaryName + " failed, error:" << boost::diagnostic_information(e) + << std::endl; + return -1; + } + printVersion(); + std::cout << "[" << bcos::getCurrentDateTime() << "] "; + std::cout << "The " + binaryName + "is running..." << std::endl; + while (!exitHandler.shouldExit()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + starter.reset(); + std::cout << "[" << bcos::getCurrentDateTime() << "] "; + std::cout << "The" + binaryName + " program exit normally." << std::endl; +} +} // namespace ppc::node \ No newline at end of file diff --git a/cpp/wedpr-main/mpc-node/MPCInitializer.h b/cpp/wedpr-main/mpc-node/MPCInitializer.h index 1d92e675..4e0da526 100644 --- a/cpp/wedpr-main/mpc-node/MPCInitializer.h +++ b/cpp/wedpr-main/mpc-node/MPCInitializer.h @@ -18,8 +18,8 @@ * @date 2023-03-24 */ #pragma once -#include "libinitializer/Common.h" #include "ppc-rpc/src/RpcFactory.h" +#include "wedpr-initializer/Common.h" #include #include namespace ppc::rpc diff --git a/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp new file mode 100644 index 00000000..ab2a51be --- /dev/null +++ b/cpp/wedpr-main/pro-node/ProNodeInitializer.cpp @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file ProNodeInitializer.cpp + * @author: yujiechen + * @date 2022-11-14 + */ +#include "ProNodeInitializer.h" +#include "ppc-rpc/src/RpcFactory.h" +#include "ppc-rpc/src/RpcMemory.h" +#include "wedpr-protocol/grpc/client/RemoteFrontBuilder.h" + +using namespace ppc::protocol; +using namespace ppc::node; +using namespace ppc::gateway; +using namespace ppc::rpc; +using namespace ppc::storage; +using namespace ppc::initializer; +using namespace bcos; + +ProNodeInitializer::ProNodeInitializer() {} + +void ProNodeInitializer::init(std::string const& _configPath) +{ + // init the log + boost::property_tree::ptree pt; + boost::property_tree::read_ini(_configPath, pt); + + m_logInitializer = std::make_shared(); + m_logInitializer->initLog(pt); + INIT_LOG(INFO) << LOG_DESC("initLog success"); + + // init the node + m_nodeInitializer = std::make_shared(_configPath); + + // init the node(no need to set the gateway) + m_nodeInitializer->init(ppc::protocol::NodeArch::PRO, nullptr); + + + INIT_LOG(INFO) << LOG_DESC("init the rpc"); + // load the rpc config + // not specify the certPath in air-mode + m_nodeInitializer->config()->loadRpcConfig(nullptr, pt); + // init RpcStatusInterface + RpcStatusInterface::Ptr rpcStatusInterface = std::make_shared(); + + + auto rpcFactory = std::make_shared(m_nodeInitializer->config()->agencyID()); + m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config()); + m_rpc->setRpcStorage(rpcStatusInterface); + m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi()); + m_nodeInitializer->registerRpcHandler(m_rpc); + + INIT_LOG(INFO) << LOG_DESC("init the rpc success"); +} + +void ProNodeInitializer::start() +{ + // start the node + if (m_nodeInitializer) + { + m_nodeInitializer->start(); + } + if (m_rpc) + { + m_rpc->start(); + } +} + +void ProNodeInitializer::stop() +{ + if (m_rpc) + { + m_rpc->stop(); + } + if (m_nodeInitializer) + { + m_nodeInitializer->stop(); + } +} \ No newline at end of file diff --git a/cpp/wedpr-main/pro-node/ProNodeInitializer.h b/cpp/wedpr-main/pro-node/ProNodeInitializer.h new file mode 100644 index 00000000..054d1b84 --- /dev/null +++ b/cpp/wedpr-main/pro-node/ProNodeInitializer.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file ProNodeInitializer.h + * @author: yujiechen + * @date 2022-11-14 + */ +#pragma once +#include "wedpr-initializer/Common.h" +#include "wedpr-initializer/Initializer.h" +#include +#include +namespace ppc::rpc +{ +class Rpc; +} +namespace ppc::front +{ +class RemoteFrontBuilder; +} +namespace ppc::node +{ +class ProNodeInitializer +{ +public: + using Ptr = std::shared_ptr; + ProNodeInitializer(); + virtual ~ProNodeInitializer() { stop(); } + + virtual void init(std::string const& _configPath); + virtual void start(); + virtual void stop(); + +private: + bcos::BoostLogInitializer::Ptr m_logInitializer; + ppc::initializer::Initializer::Ptr m_nodeInitializer; + std::shared_ptr m_rpc; +}; +} // namespace ppc::node \ No newline at end of file diff --git a/cpp/wedpr-main/pro-node/main.cpp b/cpp/wedpr-main/pro-node/main.cpp index 881c4304..a122784b 100644 --- a/cpp/wedpr-main/pro-node/main.cpp +++ b/cpp/wedpr-main/pro-node/main.cpp @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2022 WeDPR. * SPDX-License-Identifier: Apache-2.0 * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,30 +15,14 @@ * * @file main.cpp * @author: yujiechen - * @date 2022-11-25 + * @date 2022-11-14 */ -#include "ProNodeServiceApp.h" -#include "libhelper/CommandHelper.h" +#include "ProNodeInitializer.h" +#include "wedpr-main/common/NodeStarter.h" -using namespace ppctars; - -int main(int argc, char* argv[]) +using namespace ppc::node; +int main(int argc, const char* argv[]) { - try - { - ppc::initAppCommandLine(argc, argv); - ProNodeServiceApp app; - app.main(argc, argv); - app.waitForShutdown(); - return 0; - } - catch (std::exception& e) - { - cerr << "ppc-pro-node std::exception:" << boost::diagnostic_information(e) << std::endl; - } - catch (...) - { - cerr << "ppc-pro-node unknown exception." << std::endl; - } - return -1; -} + auto initializer = std::make_shared(); + startProgram(argc, argv, "ppc-pro-node", initializer); +} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/Common.h b/cpp/wedpr-protocol/grpc/Common.h new file mode 100644 index 00000000..bf2a5d76 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/Common.h @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2021 FISCO BCOS. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Common.h + * @author: yujiechen + * @date 2021-04-12 + */ +#pragma once +#include "ppc-framework/Common.h" +#include "ppc-framework/protocol/GrpcConfig.h" +#include + +namespace ppc::protocol +{ +inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr const& grpcConfig) +{ + grpc::ChannelArguments args; + if (grpcConfig == nullptr) + { + return args; + } + args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy()); + return args; +} +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp index 08e47ee2..7fe01a6f 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp @@ -21,7 +21,6 @@ #include "protobuf/RequestConverter.h" #include "wedpr-protocol/protobuf/Common.h" - using namespace ppc::protocol; using namespace ppc::proto; using namespace grpc; diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.h b/cpp/wedpr-protocol/grpc/client/FrontClient.h index 29e1a21a..acc4e96b 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.h +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.h @@ -23,12 +23,12 @@ namespace ppc::protocol { -class FrontClient : public ppc::front::IFrontClient, public GrpcClient +class FrontClient : public virtual ppc::front::IFrontClient, public GrpcClient { public: using Ptr = std::shared_ptr; - FrontClient(grpc::ChannelArguments const& channelConfig, std::string const& endPoints) - : GrpcClient(channelConfig, endPoints) + FrontClient(ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints) + : GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Front::NewStub(m_channel)) {} ~FrontClient() override = default; diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index 50a445ae..eef8c9be 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -19,6 +19,7 @@ */ #include "GatewayClient.h" #include "Common.h" +#include "Service.grpc.pb.h" #include "protobuf/RequestConverter.h" using namespace ppc; diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index 9a75a52b..ea964acd 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -27,8 +27,8 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient { public: using Ptr = std::shared_ptr; - GatewayClient(grpc::ChannelArguments const& channelConfig, std::string const& endPoints) - : GrpcClient(channelConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel)) + GatewayClient(ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints) + : GrpcClient(grpcConfig, endPoints), m_stub(ppc::proto::Gateway::NewStub(m_channel)) {} ~GatewayClient() override = default; diff --git a/cpp/wedpr-protocol/grpc/client/GrpcClient.h b/cpp/wedpr-protocol/grpc/client/GrpcClient.h index 86e44fb5..d1fda5af 100644 --- a/cpp/wedpr-protocol/grpc/client/GrpcClient.h +++ b/cpp/wedpr-protocol/grpc/client/GrpcClient.h @@ -19,6 +19,8 @@ */ #pragma once #include "Service.grpc.pb.h" +#include "ppc-framework/protocol/GrpcConfig.h" +#include "wedpr-protocol/grpc/Common.h" #include namespace ppc::protocol @@ -28,15 +30,16 @@ class GrpcClient { public: using Ptr = std::shared_ptr; - GrpcClient(grpc::ChannelArguments const& channelConfig, std::string const& endPoints) - : m_channel( - grpc::CreateCustomChannel(endPoints, grpc::InsecureChannelCredentials(), channelConfig)) + GrpcClient(ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints) + : m_channel(grpc::CreateCustomChannel( + endPoints, grpc::InsecureChannelCredentials(), toChannelConfig(grpcConfig))) {} virtual ~GrpcClient() = default; std::shared_ptr const& channel() { return m_channel; } + protected: std::shared_ptr m_channel; }; diff --git a/cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.cpp b/cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.cpp new file mode 100644 index 00000000..d00e1c98 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.cpp @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RemoteFrontBuilder.cpp + * @author: yujiechen + * @date 2024-09-4 + */ +#include "RemoteFrontBuilder.h" +// Note: it's better not to include generated grpc files in the header, since it will slow the +// compiler speed +#include "FrontClient.h" + +using namespace ppc::front; +using namespace ppc::protocol; + +IFrontClient::Ptr RemoteFrontBuilder::buildClient(std::string endPoint) const +{ + return std::make_shared(m_grpcConfig, endPoint); +} diff --git a/cpp/ppc-framework/front/IFrontBuilder.h b/cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.h similarity index 58% rename from cpp/ppc-framework/front/IFrontBuilder.h rename to cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.h index 50423645..44a3db31 100644 --- a/cpp/ppc-framework/front/IFrontBuilder.h +++ b/cpp/wedpr-protocol/grpc/client/RemoteFrontBuilder.h @@ -13,29 +13,27 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * @file IFront.h + * @file RemoteFrontBuilder.h * @author: yujiechen - * @date 2024-08-22 + * @date 2024-09-4 */ #pragma once -#include "IFront.h" +#include "ppc-framework/front/IFront.h" +#include "ppc-framework/protocol/GrpcConfig.h" namespace ppc::front { -class IFrontBuilder +class RemoteFrontBuilder : public IFrontBuilder { public: - using Ptr = std::shared_ptr; - IFrontBuilder() = default; - virtual ~IFrontBuilder() = default; + using Ptr = std::shared_ptr; + RemoteFrontBuilder(ppc::protocol::GrpcConfig::Ptr const& grpcConfig) : m_grpcConfig(grpcConfig) + {} + ~RemoteFrontBuilder() override = default; - /** - * @brief create the Front using specified config - * - * @param config the config used to build the Front - * @return IFront::Ptr he created Front - */ - virtual IFront::Ptr build(ppc::front::FrontConfig::Ptr config) const = 0; - virtual IFrontClient::Ptr buildClient(std::string endPoint) const = 0; + IFrontClient::Ptr buildClient(std::string endPoint) const override; + +private: + ppc::protocol::GrpcConfig::Ptr m_grpcConfig; }; } // namespace ppc::front \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 23b379e0..8888b1fa 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -18,6 +18,7 @@ * @date 2022-10-20 */ #include "Front.h" +#include "FrontImpl.h" using namespace ppc; using namespace bcos; @@ -34,7 +35,8 @@ using namespace ppc::front; void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace::Ptr _message, uint32_t _timeout, ErrorCallbackFunc _callback, CallbackFunc _respCallback) { - auto routeInfo = m_front->routerInfoBuilder()->build(); + auto front = std::dynamic_pointer_cast(m_front); + auto routeInfo = front->routerInfoBuilder()->build(); routeInfo->setDstInst(_agencyID); routeInfo->setTopic(_message->taskID()); bcos::bytes data; @@ -86,7 +88,4 @@ bcos::Error::Ptr Front::notifyTaskInfo(std::string const& taskID) bcos::Error::Ptr Front::eraseTaskInfo(std::string const& _taskID) { m_front->unRegisterTopic(_taskID); -} - -// get the agencyList from the gateway -void Front::asyncGetAgencyList(GetAgencyListCallback _callback) {} +} \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h index 6394df73..b3a10370 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.h @@ -19,8 +19,8 @@ */ #pragma once -#include "FrontImpl.h" #include "ppc-framework/front/FrontInterface.h" +#include "ppc-framework/front/IFront.h" #include "ppc-framework/protocol/PPCMessageFace.h" namespace ppc::front @@ -29,7 +29,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - Front(FrontImpl::Ptr front) : m_front(std::move(front)) {} + Front(IFront::Ptr front) : m_front(std::move(front)) {} ~Front() override {} /** @@ -56,12 +56,9 @@ class Front : public FrontInterface, public std::enable_shared_from_this // erase the task-info when task finished bcos::Error::Ptr eraseTaskInfo(std::string const& _taskID) override; - // get the agencyList from the gateway - void asyncGetAgencyList(GetAgencyListCallback _callback) override; - // register message handler for algorithm void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, - std::function _handler) + std::function _handler) override { uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType; auto self = weak_from_this(); @@ -82,7 +79,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this } private: - FrontImpl::Ptr m_front; + IFront::Ptr m_front; ppc::front::PPCMessageFaceFactory::Ptr m_messageFactory; }; } // namespace ppc::front \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontConfigImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontConfigImpl.h new file mode 100644 index 00000000..2996b13a --- /dev/null +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontConfigImpl.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file FrontConfigImpl.h + * @author: yujiechen + * @date 2024-08-22 + */ + +#pragma once +#include "ppc-framework/front/FrontConfig.h" +#include "ppc-framework/protocol/INodeInfo.h" +#include + +namespace ppc::front +{ +class FrontConfigImpl : public FrontConfig +{ +public: + using Ptr = std::shared_ptr; + FrontConfigImpl(ppc::protocol::INodeInfoFactory::Ptr nodeInfoFactory, int threadPoolSize, + std::string nodeID) + : FrontConfig(threadPoolSize, nodeID), m_nodeInfoFactory(std::move(nodeInfoFactory)) + {} + + ~FrontConfigImpl() override = default; + + ppc::protocol::INodeInfo::Ptr generateNodeInfo() const override + { + auto nodeInfo = m_nodeInfoFactory->build( + bcos::bytesConstRef((bcos::byte*)m_nodeID.data(), m_nodeID.size()), + m_selfEndPoint.entryPoint()); + nodeInfo->setComponents(std::set(m_components.begin(), m_components.end())); + return nodeInfo; + } + +private: + ppc::protocol::INodeInfoFactory::Ptr m_nodeInfoFactory; +}; +} // namespace ppc::front diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index 3156b6c7..d89529d6 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -28,7 +28,7 @@ namespace ppc::front { -class FrontImpl : public IFront, public std::enable_shared_from_this +class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/LocalFrontBuilder.h b/cpp/wedpr-transport/ppc-front/ppc-front/LocalFrontBuilder.h new file mode 100644 index 00000000..022f6a83 --- /dev/null +++ b/cpp/wedpr-transport/ppc-front/ppc-front/LocalFrontBuilder.h @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file LocalFrontBuilder.h + * @author: yujiechen + * @date 2024-09-4 + */ +#pragma once +#include "ppc-framework/front/IFront.h" + +namespace ppc::front +{ +class LocalFrontBuilder : public IFrontBuilder +{ +public: + using Ptr = std::shared_ptr; + LocalFrontBuilder(IFront::Ptr front) { m_front = front; } + LocalFrontBuilder() = default; + ~LocalFrontBuilder() override = default; + + IFrontClient::Ptr buildClient(std::string endPoint) const override { return m_front; } + + void setFront(IFront::Ptr front) { m_front = std::move(front); } + +private: + IFront::Ptr m_front; +}; +} // namespace ppc::front \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp index 66f0972c..5b21374e 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp @@ -59,10 +59,6 @@ Rpc::Rpc(std::shared_ptr _wsService, std::string const& // register the handler for GET_TASK_STATUS m_methodToHandler[GET_TASK_STATUS] = boost::bind(&Rpc::getTaskStatus, this, boost::placeholders::_1, boost::placeholders::_2); - // register the handler for REGISTER_GATEWAY_URL - m_methodToHandler[REGISTER_GATEWAY_URL] = boost::bind( - &Rpc::registerGatewayUrl, this, boost::placeholders::_1, boost::placeholders::_2); - // register ecdh bs mode methods m_methodToHandler[ASYNC_RUN_BS_MODE_TASK] = boost::bind( &Rpc::asyncRunBsModeTask, this, boost::placeholders::_1, boost::placeholders::_2); @@ -307,51 +303,6 @@ void Rpc::getTaskStatus(Json::Value const& _req, RespFunc _respFunc) _respFunc(result->error(), result->serializeToJson()); } -void Rpc::registerGatewayUrl(Json::Value const& _req, RespFunc _respFunc) -{ - if (!m_rpcStorage) - { - BOOST_THROW_EXCEPTION( - BCOS_ERROR((int64_t)RpcError::StorageNotSet, "storage for rpc not set")); - } - - if (!_req.isMember("id")) - { - BOOST_THROW_EXCEPTION(InvalidParameter() << errinfo_comment("Must specify the agencyID")); - } - auto agencyID = _req["id"].asString(); - - if (!_req.isMember("url")) - { - BOOST_THROW_EXCEPTION( - InvalidParameter() << errinfo_comment("Must specify the gateway url")); - } - auto agencyUrl = _req["url"].asString(); - - std::vector endpoints; - boost::split(endpoints, agencyUrl, boost::is_any_of(",")); - for (auto& endpoint : endpoints) - { - if (!checkEndpoint(endpoint)) - { - BOOST_THROW_EXCEPTION( - InvalidParameter() << bcos::errinfo_comment("Invalid endpoint: " + endpoint)); - } - } - Json::Value response; - auto error = m_rpcStorage->insertGateway(agencyID, agencyUrl); - if (error && error->errorCode()) - { - response["code"] = error->errorCode(); - response["message"] = error->errorMessage(); - _respFunc(error, std::move(response)); - return; - } - - response["code"] = 0; - response["message"] = "success"; - _respFunc(error, std::move(response)); -} void Rpc::asyncRunBsModeTask(Json::Value const& _req, RespFunc _respFunc) { diff --git a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h index 59eadce5..45871f29 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/Rpc.h +++ b/cpp/wedpr-transport/ppc-rpc/src/Rpc.h @@ -122,7 +122,6 @@ class Rpc : public RpcInterface virtual void runTask(Json::Value const& _req, RespFunc _respFunc); virtual void asyncRunTask(Json::Value const& _req, RespFunc _respFunc); virtual void getTaskStatus(Json::Value const& _req, RespFunc _respFunc); - virtual void registerGatewayUrl(Json::Value const& _req, RespFunc _respFunc); virtual void asyncRunBsModeTask(Json::Value const& _req, RespFunc _respFunc); virtual void fetchCipher(Json::Value const& _req, RespFunc _respFunc); diff --git a/cpp/wedpr-transport/ppc-rpc/src/RpcMemory.h b/cpp/wedpr-transport/ppc-rpc/src/RpcMemory.h index 30430027..78a447c5 100644 --- a/cpp/wedpr-transport/ppc-rpc/src/RpcMemory.h +++ b/cpp/wedpr-transport/ppc-rpc/src/RpcMemory.h @@ -40,8 +40,6 @@ class RpcMemory : public RpcStatusInterface bcos::Error::Ptr insertTask(protocol::Task::Ptr _task) override; bcos::Error::Ptr updateTaskStatus(protocol::TaskResult::Ptr _taskResult) override; protocol::TaskResult::Ptr getTaskStatus(const std::string& _taskID) override; - bcos::Error::Ptr insertGateway( - const std::string& _agencyID, const std::string& _endpoint) override; bcos::Error::Ptr deleteGateway(const std::string& _agencyID) override; std::vector listGateway() override; diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.cpp b/cpp/wedpr-transport/sdk/ProTransportImpl.cpp index 1573c0fa..37424c3b 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.cpp +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.cpp @@ -19,7 +19,9 @@ */ #include "ProTransportImpl.h" #include "protocol/src/v1/MessageImpl.h" +#include "wedpr-protocol/grpc/client/GatewayClient.h" #include "wedpr-protocol/grpc/server/FrontServer.h" +#include "wedpr-protocol/grpc/server/GrpcServer.h" using namespace ppc::front; using namespace ppc::protocol; @@ -33,9 +35,8 @@ ProTransportImpl::ProTransportImpl(ppc::front::FrontConfig::Ptr config) m_server = std::make_shared(grpcServerConfig); FrontFactory frontFactory; - grpc::ChannelArguments channelConfig; - channelConfig.SetLoadBalancingPolicyName(m_config->loadBalancePolicy()); - auto gateway = std::make_shared(channelConfig, m_config->gatewayEndPoints()); + auto gateway = + std::make_shared(m_config->grpcConfig(), m_config->gatewayGrpcTarget()); m_front = frontFactory.build(std::make_shared(), std::make_shared(), std::make_shared(), gateway, config); @@ -46,4 +47,15 @@ ProTransportImpl::ProTransportImpl(ppc::front::FrontConfig::Ptr config) // register the frontService m_server->registerService(frontService); +} + +void ProTransportImpl::start() +{ + m_server->start(); + m_front->start(); +} +void ProTransportImpl::stop() +{ + m_server->stop(); + m_front->stop(); } \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.h b/cpp/wedpr-transport/sdk/ProTransportImpl.h index d32ba9e5..5756e8d4 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.h +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.h @@ -18,10 +18,13 @@ * @date 2024-09-04 */ #pragma once - #include "TransportImpl.h" -#include "wedpr-protocol/grpc/client/GatewayClient.h" -#include "wedpr-protocol/grpc/server/GrpcServer.h" + +namespace ppc::protocol +{ +class GrpcServer; +} + namespace ppc::sdk { @@ -31,19 +34,11 @@ class ProTransportImpl : public Transport using Ptr = std::shared_ptr; ProTransportImpl(ppc::front::FrontConfig::Ptr config); - void start() override - { - m_server->start(); - m_front->start(); - } - void stop() override - { - m_server->stop(); - m_front->stop(); - } + void start() override; + void stop() override; protected: ppc::front::FrontConfig::Ptr m_config; - ppc::protocol::GrpcServer::Ptr m_server; + std::shared_ptr m_server; }; } // namespace ppc::sdk \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/TransportBuilder.cpp b/cpp/wedpr-transport/sdk/TransportBuilder.cpp new file mode 100644 index 00000000..8042b844 --- /dev/null +++ b/cpp/wedpr-transport/sdk/TransportBuilder.cpp @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file TransportBuilder.cpp + * @author: yujiechen + * @date 2024-09-04 + */ + +#include "TransportBuilder.h" +#include "ProTransportImpl.h" +#include "Transport.h" +#include "TransportImpl.h" +#include "ppc-front/FrontConfigImpl.h" +#include "protobuf/NodeInfoImpl.h" +#include + +using namespace ppc::sdk; + +Transport::Ptr TransportBuilder::build( + SDKMode mode, ppc::front::FrontConfig::Ptr config, ppc::gateway::IGateway::Ptr const& gateway) +{ + switch (mode) + { + case SDKMode::AIR: + { + return std::make_shared(config, gateway); + } + case SDKMode::PRO: + { + return std::make_shared(config); + } + default: + throw std::runtime_error("Unsupported sdk mode, only support AIR/PRO mode!"); + } +} + +ppc::front::FrontConfig::Ptr TransportBuilder::buildConfig(int threadPoolSize, std::string nodeID) +{ + return std::make_shared( + std::make_shared(), threadPoolSize, nodeID); +} diff --git a/cpp/wedpr-transport/sdk/TransportBuilder.h b/cpp/wedpr-transport/sdk/TransportBuilder.h index f0492822..b2b57931 100644 --- a/cpp/wedpr-transport/sdk/TransportBuilder.h +++ b/cpp/wedpr-transport/sdk/TransportBuilder.h @@ -18,9 +18,9 @@ * @date 2024-09-04 */ #pragma once -#include "ProTransportImpl.h" #include "Transport.h" -#include "TransportImpl.h" +#include "ppc-framework/front/FrontConfig.h" +#include "ppc-framework/gateway/IGateway.h" #include namespace ppc::sdk { @@ -37,21 +37,8 @@ class TransportBuilder virtual ~TransportBuilder() = default; Transport::Ptr build(SDKMode mode, ppc::front::FrontConfig::Ptr config, - ppc::gateway::IGateway::Ptr const& gateway) - { - switch (mode) - { - case SDKMode::AIR: - { - return std::make_shared(config, gateway); - } - case SDKMode::PRO: - { - return std::make_shared(config); - } - default: - throw std::runtime_error("Unsupported sdk mode, only support AIR/PRO mode!"); - } - } + ppc::gateway::IGateway::Ptr const& gateway); + + ppc::front::FrontConfig::Ptr buildConfig(int threadPoolSize, std::string nodeID); }; } // namespace ppc::sdk \ No newline at end of file