diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index 25073ac3..76cdbcad 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -105,7 +105,7 @@ class IFront : virtual public IFrontClient virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0; virtual void asyncGetAgencies( - std::function)> callback) = 0; + std::function)> callback) = 0; /** * @brief register the nodeInfo to the gateway diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h index d2489217..7ccc53c5 100644 --- a/cpp/ppc-framework/gateway/IGateway.h +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -67,7 +67,7 @@ class IGateway virtual void asyncGetPeers(std::function callback) = 0; virtual void asyncGetAgencies( - std::function)> callback) = 0; + std::function)> callback) = 0; virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0; diff --git a/cpp/ppc-framework/protocol/GrpcConfig.h b/cpp/ppc-framework/protocol/GrpcConfig.h index b4dae30d..108b22c1 100644 --- a/cpp/ppc-framework/protocol/GrpcConfig.h +++ b/cpp/ppc-framework/protocol/GrpcConfig.h @@ -60,9 +60,13 @@ class GrpcConfig bool enableHealthCheck() const { return m_enableHealthCheck; } void setEnableHealthCheck(bool enableHealthCheck) { m_enableHealthCheck = enableHealthCheck; } + void setEnableDnslookup(bool enableDnslookup) { m_enableDnslookup = enableDnslookup; } + + bool enableDnslookup() const { return m_enableDnslookup; } protected: bool m_enableHealthCheck = false; std::string m_loadBalancePolicy = "round_robin"; + bool m_enableDnslookup = false; }; } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/tools/build_ppc.sh b/cpp/tools/build_ppc.sh index 509a0bd0..c7ec3f7a 100644 --- a/cpp/tools/build_ppc.sh +++ b/cpp/tools/build_ppc.sh @@ -35,7 +35,7 @@ default_version="v1.1.0" compatibility_version=${default_version} command="deploy" -disable_ra2018="false" +disable_ra2018="true" LOG_WARN() { local content=${1} @@ -420,9 +420,9 @@ generate_node_config_ini() { ; the threadPoolSize thread_count = 4 ; the gatewayService endpoint information - service.gateway_target = + gateway_target = ; the components - service.components = + components = nodeid=${nodeid} [crypto] @@ -524,8 +524,9 @@ generate_node_config_ini() { ; MB max_log_file_size=200 ; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message - format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% - enable_rotate_by_hour=false + ;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% + format=%Severity%|%TimeStamp%|%Message% + enable_rotate_by_hour=true log_name_pattern=ppcs-psi4ef.log ; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log rotate_name_pattern=log_%Y%m%d.%H%M.log diff --git a/cpp/tools/build_wedpr_cem.sh b/cpp/tools/build_wedpr_cem.sh index 5cb54e9d..0a2a6ed2 100644 --- a/cpp/tools/build_wedpr_cem.sh +++ b/cpp/tools/build_wedpr_cem.sh @@ -377,8 +377,9 @@ generate_config_ini() { ; MB max_log_file_size=200 ; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message - format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% - enable_rotate_by_hour=false + ;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% + format=%Severity%|%TimeStamp%|%Message% + enable_rotate_by_hour=true log_name_pattern=ppcs-psi4ef.log ; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log rotate_name_pattern=log_%Y%m%d.%H%M.log diff --git a/cpp/tools/ppc-builder/build_ppc.py b/cpp/tools/ppc-builder/build_ppc.py index 713d1801..21d98853 100644 --- a/cpp/tools/ppc-builder/build_ppc.py +++ b/cpp/tools/ppc-builder/build_ppc.py @@ -1,11 +1,11 @@ #!/usr/bin/python # -*- coding: UTF-8 -*- # Note: here can't be refactored by autopep +import sys +sys.path.append("src/") from controller import commandline_helper from common import utilities import traceback -import sys -sys.path.append("src/") def main(): diff --git a/cpp/tools/ppc-builder/conf/config-example.toml b/cpp/tools/ppc-builder/conf/config-example.toml index 7225f7e7..a27685dd 100644 --- a/cpp/tools/ppc-builder/conf/config-example.toml +++ b/cpp/tools/ppc-builder/conf/config-example.toml @@ -43,7 +43,7 @@ holding_msg_minutes = 30 # configuration for the ppc-node [[agency.node]] # disable the ra2018 psi or not, default enable ra2018 - disable_ra2018 = false + disable_ra2018 = true deploy_ip=["127.0.0.1:2"] # node name, Notice: node_name in the same agency and group must be unique node_name = "node0" @@ -128,7 +128,7 @@ holding_msg_minutes = 30 # configuration for the ppc-node [[agency.node]] # disable the ra2018 psi or not, default enable ra2018 - disable_ra2018 = false + disable_ra2018 = true deploy_ip=["127.0.0.1:2"] # node name, Notice: node_name in the same agency and group must be unique node_name = "node0" diff --git a/cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py b/cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py index 3e9d8c2c..615d1d70 100644 --- a/cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py +++ b/cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py @@ -104,7 +104,7 @@ def __generate_single_node_inner_config__(self, tpl_config_path, node_path, priv config_content, node_config.rpc_config, node_index) # load the transport config self.__generate_transport_config__(config_content, - node_config, node_id, ip) + node_config, node_id, ip, node_index) # load the storage config self.__generate_storage_config__( config_content, node_config.storage_config) @@ -192,7 +192,7 @@ def __generate_hdfs_storage_config__(self, config_content, hdfs_storage_config): hdfs_storage_config.name_node_port) config_content[section_name]["token"] = hdfs_storage_config.token - def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip): + def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip, node_index): """_summary_ Args: @@ -203,18 +203,18 @@ def __generate_transport_config__(self, config_content, node_config, node_id, de ; the threadPoolSize thread_count = 4 ; the gatewayService endpoint information - service.gateway_target = + gateway_target = ; the components - service.components = + components = nodeid= """ section = "transport" config_content[section]["listen_ip"] = node_config.grpc_listen_ip config_content[section]["listen_port"] = str( - node_config.grpc_listen_port) + node_config.grpc_listen_port + node_index) config_content[section]["host_ip"] = deploy_ip - config_content[section]["service.gateway_target"] = node_config.gateway_config.gateway_grpc_target - config_content[section]["service.components"] = node_config.components + config_content[section]["gateway_target"] = node_config.gateway_config.gateway_grpc_target + config_content[section]["components"] = node_config.components config_content[section]["nodeid"] = node_id def __generate_ra2018psi_config__(self, config_content, ra2018psi_config): diff --git a/cpp/tools/ppc-builder/src/tpl/config.ini.gateway b/cpp/tools/ppc-builder/src/tpl/config.ini.gateway index 8acc4990..10afa936 100644 --- a/cpp/tools/ppc-builder/src/tpl/config.ini.gateway +++ b/cpp/tools/ppc-builder/src/tpl/config.ini.gateway @@ -39,8 +39,9 @@ ; MB max_log_file_size=200 ; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message - format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% - enable_rotate_by_hour=false + ;format=%Severity%|ppcs-gateway|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% + format=%Severity%|%TimeStamp%|%Message% + enable_rotate_by_hour=true log_name_pattern=ppcs-gateway.log ; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log rotate_name_pattern=log_%Y%m%d.%H%M.log diff --git a/cpp/tools/ppc-builder/src/tpl/config.ini.node b/cpp/tools/ppc-builder/src/tpl/config.ini.node index 177536ac..d47d7883 100644 --- a/cpp/tools/ppc-builder/src/tpl/config.ini.node +++ b/cpp/tools/ppc-builder/src/tpl/config.ini.node @@ -4,7 +4,7 @@ ; the private key path for the psi-server private_key_path = conf/node.pem ; disable the ra2018 or not, default enable ra2018 - disable_ra2018 = false + disable_ra2018 = true ; the path that allows programs to access ; data_location = data ; task_timeout_minutes = 180 @@ -38,9 +38,9 @@ ; the threadPoolSize thread_count = 4 ; the gatewayService endpoint information - service.gateway_target = + gateway_target = ; the components - service.components = + components = nodeid= [storage] @@ -101,8 +101,9 @@ ; MB max_log_file_size=200 ; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message - format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% - enable_rotate_by_hour=false + ;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% + format=%Severity%|%TimeStamp%|%Message% + enable_rotate_by_hour=true log_name_pattern=ppcs-psi4ef.log ; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log rotate_name_pattern=log_%Y%m%d.%H%M.log diff --git a/cpp/wedpr-computing/ppc-cem/tests/data/config.ini b/cpp/wedpr-computing/ppc-cem/tests/data/config.ini index 5cf1c6f8..c9562ec0 100644 --- a/cpp/wedpr-computing/ppc-cem/tests/data/config.ini +++ b/cpp/wedpr-computing/ppc-cem/tests/data/config.ini @@ -56,7 +56,7 @@ max_log_file_size=200 ; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message #format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message% - #enable_rotate_by_hour=false + #enable_rotate_by_hour=true #log_name_pattern=ppcs-psi4ef.log ; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log #rotate_name_pattern=log_%Y%m%d.%H%M.log diff --git a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp index 530be4c0..d8dadc32 100644 --- a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp +++ b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp @@ -129,7 +129,8 @@ void PPCConfig::loadFrontConfig(bool requireTransport, } m_frontConfig->setNodeID(nodeID); m_frontConfig->setThreadPoolSize(threadCount); - + PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig and not require the transport") + << printFrontDesc(m_frontConfig); if (!requireTransport) { return; @@ -137,15 +138,18 @@ void PPCConfig::loadFrontConfig(bool requireTransport, loadEndpointConfig(m_frontConfig->mutableSelfEndPoint(), true, "transport", pt); // the gateway targets - auto gatewayTargets = pt.get("transport.service.gateway_target", ""); + auto gatewayTargets = pt.get("transport.gateway_target", ""); if (gatewayTargets.empty()) { - BOOST_THROW_EXCEPTION(InvalidConfig() << errinfo_comment( - "Must specify the transport.service.gateway_target!")); + BOOST_THROW_EXCEPTION( + InvalidConfig() << errinfo_comment("Must specify the transport.gateway_target!")); } + m_frontConfig->setGatewayGrpcTarget(gatewayTargets); // the components - auto components = pt.get("transport.service.components", ""); + auto components = pt.get("transport.components", ""); boost::split(m_frontConfig->mutableComponents(), components, boost::is_any_of(",")); + + PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig") << printFrontDesc(m_frontConfig); } void PPCConfig::setPrivateKey(bcos::bytes const& _privateKey) diff --git a/cpp/wedpr-helper/ppc-utilities/Utilities.h b/cpp/wedpr-helper/ppc-utilities/Utilities.h index a3d90fb2..c5e52009 100644 --- a/cpp/wedpr-helper/ppc-utilities/Utilities.h +++ b/cpp/wedpr-helper/ppc-utilities/Utilities.h @@ -65,7 +65,7 @@ inline std::string generateUUID() return boost::uuids::to_string(uuid_gen()); } template -inline std::string printVector(std::vector const& list) +inline std::string printVector(T const& list) { std::stringstream oss; for (auto const& it : list) diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index 4e7c4eec..14ceef33 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -413,6 +413,10 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc) void Initializer::start() { + if (m_transport) + { + m_transport->start(); + } if (m_ppcFront) { m_ppcFront->start(); @@ -455,6 +459,10 @@ void Initializer::start() void Initializer::stop() { + if (m_transport) + { + m_transport->stop(); + } // stop the network firstly if (m_ppcFront) { diff --git a/cpp/wedpr-protocol/grpc/Common.h b/cpp/wedpr-protocol/grpc/Common.h index b04f85d5..c0b2d0fb 100644 --- a/cpp/wedpr-protocol/grpc/Common.h +++ b/cpp/wedpr-protocol/grpc/Common.h @@ -31,13 +31,23 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con { return args; } - args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy()); + // TODO: when enable round_robin load-balance policy, the program will be exited on dns resolver + // args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy()); if (grpcConfig->enableHealthCheck()) { args.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"\"}}"); } + // disable dns lookup + if (!grpcConfig->enableDnslookup()) + { + args.SetInt("grpc.enable_dns_srv_lookup", 0); + } + else + { + args.SetInt("grpc.enable_dns_srv_lookup", 1); + } return args; } } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/Common.h b/cpp/wedpr-protocol/grpc/client/Common.h index c7b3ab2f..99a59a19 100644 --- a/cpp/wedpr-protocol/grpc/client/Common.h +++ b/cpp/wedpr-protocol/grpc/client/Common.h @@ -22,4 +22,5 @@ #define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]" #define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]" +#define FRONT_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[FRONT][CLIENT]" #define HEALTH_LOG(LEVEL) BCOS_LOG(LEVEL) << "[HEALTH]" \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp index a2037613..c934a442 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.cpp @@ -18,6 +18,7 @@ * @date 2024-09-02 */ #include "FrontClient.h" +#include "Common.h" #include "protobuf/src/RequestConverter.h" #include "wedpr-protocol/protobuf/src/Common.h" @@ -33,8 +34,8 @@ void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, Recei msg->encode(encodedData); receivedMsg.set_data(encodedData.data(), encodedData.size()); - ClientContext context; + auto context = std::make_shared(); auto response = std::make_shared(); - m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(), - [response, callback](Status status) { callback(toError(status, std::move(*response))); }); + m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(), + [response, callback](Status status) { callback(toError(status, *response)); }); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index 7b55c1a3..8089f947 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -33,88 +33,89 @@ void GatewayClient::asyncSendMessage(RouteType routeType, long timeout, ReceiveMsgFunc callback) { auto request = generateRequest(traceID, routeType, routeInfo, std::move(payload), timeout); - ClientContext context; + auto context = std::make_shared(); auto response = std::make_shared(); - m_stub->async()->asyncSendMessage(&context, request.get(), response.get(), - [callback, response](Status status) { callback(toError(status, std::move(*response))); }); + m_stub->async()->asyncSendMessage(context.get(), request.get(), response.get(), + [callback, response](Status status) { callback(toError(status, *response)); }); } void GatewayClient::asyncGetPeers(std::function callback) { auto response = std::make_shared(); - ClientContext context; + auto context = std::make_shared(); auto request = std::make_shared(); m_stub->async()->asyncGetPeers( - &context, request.get(), response.get(), [callback, response](Status status) { - callback(toError(status, std::move(*response->mutable_error())), response->peersinfo()); + context.get(), request.get(), response.get(), [callback, response](Status status) { + callback(toError(status, response->error()), response->peersinfo()); }); } void GatewayClient::asyncGetAgencies( - std::function)> callback) + std::function)> callback) { auto response = std::make_shared(); - ClientContext context; + auto context = std::make_shared(); auto request = std::make_shared(); m_stub->async()->asyncGetAgencies( - &context, request.get(), response.get(), [callback, response](Status status) { - std::vector agencies; + context.get(), request.get(), response.get(), [callback, response](Status status) { + std::set agencies; for (int i = 0; i < response->agencies_size(); i++) { - agencies.emplace_back(response->agencies(i)); + agencies.insert(response->agencies(i)); } - callback(toError(status, std::move(*response->mutable_error())), agencies); + callback(toError(status, response->error()), agencies); }); } bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) { - broadCast([nodeInfo](ChannelInfo const& channel) { + return broadCast([nodeInfo](ChannelInfo const& channel) { std::unique_ptr stub( ppc::proto::Gateway::NewStub(channel.channel)); auto request = toNodeInfoRequest(nodeInfo); ClientContext context; std::shared_ptr response = std::make_shared(); auto status = stub->registerNodeInfo(&context, *request, response.get()); - return toError(status, std::move(*response)); + auto result = toError(status, *response); + return result; }); } bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) { - broadCast([nodeID](ChannelInfo const& channel) { + return broadCast([nodeID](ChannelInfo const& channel) { std::unique_ptr stub( ppc::proto::Gateway::NewStub(channel.channel)); auto request = toNodeInfoRequest(nodeID, ""); ClientContext context; std::shared_ptr response = std::make_shared(); auto status = stub->unRegisterNodeInfo(&context, *request, response.get()); - return toError(status, std::move(*response)); + return toError(status, *response); }); } bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) { - broadCast([nodeID, topic](ChannelInfo const& channel) { + return broadCast([nodeID, topic](ChannelInfo const& channel) { std::unique_ptr stub( ppc::proto::Gateway::NewStub(channel.channel)); auto request = toNodeInfoRequest(nodeID, topic); ClientContext context; std::shared_ptr response = std::make_shared(); auto status = stub->registerTopic(&context, *request, response.get()); - return toError(status, std::move(*response)); + return toError(status, *response); }); } bcos::Error::Ptr GatewayClient::unRegisterTopic( bcos::bytesConstRef nodeID, std::string const& topic) { - broadCast([nodeID, topic](ChannelInfo const& channel) { + return broadCast([nodeID, topic](ChannelInfo const& channel) { std::unique_ptr stub( ppc::proto::Gateway::NewStub(channel.channel)); auto request = toNodeInfoRequest(nodeID, topic); ClientContext context; std::shared_ptr response = std::make_shared(); auto status = stub->unRegisterTopic(&context, *request, response.get()); - return toError(status, std::move(*response)); + return toError(status, *response); }); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index 58c79cec..0d2415ae 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -57,7 +57,7 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient void asyncGetPeers(std::function callback) override; void asyncGetAgencies( - std::function)> callback) override; + std::function)> callback) override; void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID, diff --git a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp b/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp index 2ba6e503..6516e1ed 100644 --- a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp @@ -48,6 +48,7 @@ bool GrpcClient::checkHealth() { try { + HEALTH_LOG(TRACE) << LOG_DESC("checkHealth"); ClientContext context; HealthCheckResponse response; auto status = diff --git a/cpp/wedpr-protocol/grpc/client/HealthCheckTimer.cpp b/cpp/wedpr-protocol/grpc/client/HealthCheckTimer.cpp index 4b18b0df..aa9f9223 100644 --- a/cpp/wedpr-protocol/grpc/client/HealthCheckTimer.cpp +++ b/cpp/wedpr-protocol/grpc/client/HealthCheckTimer.cpp @@ -45,7 +45,6 @@ void HealthCheckTimer::start() return; } healthChecker->checkHealth(); - healthChecker->m_timer->restart(); }); if (m_timer) { @@ -78,6 +77,8 @@ void HealthCheckTimer::registerHealthCheckHandler(HealthCheckHandler::Ptr health } bcos::WriteGuard l(x_healthCheckHandlers); m_healthCheckHandlers[healthCheckHandler->serviceName] = healthCheckHandler; + HEALTH_LOG(INFO) << LOG_DESC("registerHealthCheckHandler for ") + << healthCheckHandler->serviceName; } diff --git a/cpp/wedpr-protocol/grpc/server/FrontServer.cpp b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp index 8b1182e5..90e7cebe 100644 --- a/cpp/wedpr-protocol/grpc/server/FrontServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp @@ -29,13 +29,14 @@ using namespace grpc; ServerUnaryReactor* FrontServer::onReceiveMessage( CallbackServerContext* context, const ReceivedMessage* receivedMsg, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { // decode the request auto msg = m_msgBuilder->build(bcos::bytesConstRef( (bcos::byte*)receivedMsg->data().data(), receivedMsg->data().size())); m_front->onReceiveMessage(msg, [reactor, reply](bcos::Error::Ptr error) { + FRONT_SERVER_LOG(TRACE) << LOG_DESC("onReceiveMessage"); toSerializedError(reply, error); reactor->Finish(Status::OK); }); @@ -48,5 +49,5 @@ ServerUnaryReactor* FrontServer::onReceiveMessage( std::make_shared(-1, std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp index 8b770a06..e25fcb14 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp @@ -26,7 +26,7 @@ using namespace grpc; ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* context, const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { // TODO: optimize here @@ -48,13 +48,13 @@ ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* conte "handle message failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers( grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::PeersInfo* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { m_gateway->asyncGetPeers([reactor, reply](bcos::Error::Ptr error, std::string peersInfo) { @@ -72,17 +72,17 @@ grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers( -1, "asyncGetPeers failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies( grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::AgenciesInfo* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { m_gateway->asyncGetAgencies( - [reactor, reply](bcos::Error::Ptr error, std::vector agencies) { + [reactor, reply](bcos::Error::Ptr error, std::set agencies) { toSerializedError(reply->mutable_error(), error); for (auto const& it : agencies) { @@ -100,14 +100,14 @@ grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies( "asyncGetAgencies failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* context, const ppc::proto::NodeInfo* serializedNodeInfo, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { auto nodeInfo = toNodeInfo(m_nodeInfoFactory, *serializedNodeInfo); @@ -124,13 +124,13 @@ ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* conte "registerNodeInfo failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } ServerUnaryReactor* GatewayServer::unRegisterNodeInfo( CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { auto result = m_gateway->unRegisterNodeInfo( @@ -147,13 +147,13 @@ ServerUnaryReactor* GatewayServer::unRegisterNodeInfo( std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } ServerUnaryReactor* GatewayServer::registerTopic( CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { auto result = m_gateway->registerTopic( @@ -171,13 +171,13 @@ ServerUnaryReactor* GatewayServer::registerTopic( -1, "registerTopic failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } ServerUnaryReactor* GatewayServer::unRegisterTopic( CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) { - std::shared_ptr reactor(context->DefaultReactor()); + ServerUnaryReactor* reactor(context->DefaultReactor()); try { auto result = m_gateway->unRegisterTopic( @@ -195,5 +195,5 @@ ServerUnaryReactor* GatewayServer::unRegisterTopic( "unRegisterTopic failed for : " + std::string(boost::diagnostic_information(e)))); reactor->Finish(Status::OK); } - return reactor.get(); + return reactor; } \ No newline at end of file diff --git a/cpp/wedpr-protocol/protobuf/src/RequestConverter.h b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h index 4f1544bb..7c84d79d 100644 --- a/cpp/wedpr-protocol/protobuf/src/RequestConverter.h +++ b/cpp/wedpr-protocol/protobuf/src/RequestConverter.h @@ -39,12 +39,13 @@ inline MessageOptionalHeader::Ptr generateRouteInfo( routeInfo->setDstNode( bcos::bytes(serializedRouteInfo.dstnode().begin(), serializedRouteInfo.dstnode().end())); routeInfo->setDstInst(serializedRouteInfo.dstinst()); + routeInfo->setTopic(serializedRouteInfo.topic()); return routeInfo; } -inline std::shared_ptr generateRequest(std::string const& traceID, - RouteType routeType, - MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout) +inline std::shared_ptr generateRequest(std::string const& traceID, + RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, + long timeout) { auto request = std::make_shared(); request->set_traceid(traceID); @@ -106,7 +107,7 @@ inline INodeInfo::Ptr toNodeInfo( return nodeInfo; } -inline bcos::Error::Ptr toError(grpc::Status const& status, ppc::proto::Error&& error) +inline bcos::Error::Ptr toError(grpc::Status const& status, ppc::proto::Error const& error) { if (!status.ok()) { diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 153e45ea..23dc4dec 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -58,7 +58,7 @@ void Front::stop() void Front::fetchGatewayMetaInfo() { auto self = weak_from_this(); - m_front->asyncGetAgencies([self](bcos::Error::Ptr error, std::vector agencies) { + m_front->asyncGetAgencies([self](bcos::Error::Ptr error, std::set agencies) { auto front = self.lock(); if (!front) { @@ -71,15 +71,16 @@ void Front::fetchGatewayMetaInfo() << LOG_KV("msg", error->errorMessage()); return; } + std::vector agencyList(agencies.begin(), agencies.end()); bcos::UpgradableGuard l(front->x_agencyList); - if (front->m_agencyList == agencies) + if (front->m_agencyList == agencyList) { return; } bcos::UpgradeGuard ul(l); - front->m_agencyList = agencies; + front->m_agencyList = agencyList; FRONT_LOG(INFO) << LOG_DESC("Update agencies information") - << LOG_KV("agencies", printVector(agencies)); + << LOG_KV("agencies", printVector(agencyList)); }); m_fetcher->restart(); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp index 920ceeb2..f88f6bc4 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp @@ -223,6 +223,8 @@ void FrontImpl::asyncSendMessageToGateway(bool responsePacket, MessagePayload::P routeInfo->setSrcNode(m_nodeID); auto payload = std::make_shared(); frontMessage->encode(*payload); + FRONT_LOG(TRACE) << LOG_DESC("asyncSendMessageToGateway") << LOG_KV("traceID", traceID) + << LOG_KV("route", printOptionalField(routeInfo)); m_gatewayClient->asyncSendMessage( routeType, routeInfo, traceID, std::move(*payload), timeout, callback); } diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index b60f1e87..ca751374 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -142,7 +142,7 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ } void asyncGetAgencies( - std::function)> callback) override + std::function)> callback) override { m_gatewayClient->asyncGetAgencies(callback); } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 7a997afc..29ebc29b 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -42,7 +42,7 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service, m_gatewayInfoFactory(std::make_shared(service->nodeID(), agency)), m_localRouter(std::make_shared( m_gatewayInfoFactory, m_frontBuilder, std::make_shared(ioService))), - m_peerRouter(std::make_shared(m_service)) + m_peerRouter(std::make_shared(m_service, m_gatewayInfoFactory)) { m_service->registerMsgHandler((uint16_t)GatewayPacketType::P2PMessage, boost::bind(&GatewayImpl::onReceiveP2PMessage, this, boost::placeholders::_1, @@ -53,6 +53,34 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service, boost::placeholders::_2)); m_gatewayRouterManager = std::make_shared( m_service, m_gatewayInfoFactory, m_localRouter, m_peerRouter); + + m_service->registerOnNewSession([this](WsSession::Ptr _session) { + if (!_session) + { + return; + } + m_p2pRouterManager->onNewSession(_session->nodeId()); + }); + + m_service->registerOnDeleteSession([this](WsSession::Ptr _session) { + if (!_session) + { + return; + } + m_p2pRouterManager->onEraseSession(_session->nodeId()); + }); + + m_p2pRouterManager->registerUnreachableHandler([this](std::string const& unreachableNode) { + m_gatewayRouterManager->removeUnreachableP2pNode(unreachableNode); + }); + + m_service->registerDisconnectHandler([this](WsSession::Ptr _session) { + if (!_session) + { + return; + } + m_gatewayRouterManager->removeUnreachableP2pNode(_session->nodeId()); + }); } void GatewayImpl::start() @@ -287,14 +315,13 @@ void GatewayImpl::asyncGetPeers(std::function cal } } -void GatewayImpl::asyncGetAgencies( - std::function)> callback) +void GatewayImpl::asyncGetAgencies(std::function)> callback) { if (!callback) { return; } auto agencies = m_peerRouter->agencies(); - agencies.emplace_back(m_agency); + agencies.insert(m_agency); callback(nullptr, agencies); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index 4cc4af78..87e10fa5 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -69,7 +69,7 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this callback) override; void asyncGetAgencies( - std::function)> callback) override; + std::function)> callback) override; protected: virtual void onReceiveP2PMessage( diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp index de96b110..e16d0fa4 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp @@ -30,7 +30,7 @@ void MessageCache::insertCache( std::string const& topic, ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback) { // hold the message - GATEWAY_LOG(DEBUG) << LOG_BADGE("MessageCache: insertCache") << LOG_KV("topic", topic); + GATEWAY_LOG(DEBUG) << LOG_BADGE("MessageCache: insertCache") << printMessage(msg); bcos::ReadGuard l(x_msgCache); auto it = m_msgCache.find(topic); if (it != m_msgCache.end()) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h index 470d4b3b..cfa4362e 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -71,6 +71,7 @@ class GatewayNodeInfoFactory virtual ~GatewayNodeInfoFactory() = default; virtual GatewayNodeInfo::Ptr build() const = 0; + virtual GatewayNodeInfo::Ptr build(std::string const& p2pNode) const = 0; }; struct GatewayNodeInfoCmp { diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index fbe3a205..553af157 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -30,10 +30,13 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo public: using Ptr = std::shared_ptr; GatewayNodeInfoImpl() : m_rawGatewayInfo(std::make_shared()) {} - GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency) - : GatewayNodeInfoImpl() + GatewayNodeInfoImpl(std::string const& p2pNodeID) : GatewayNodeInfoImpl() { m_rawGatewayInfo->set_p2pnodeid(p2pNodeID); + } + GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency) + : GatewayNodeInfoImpl(p2pNodeID) + { m_rawGatewayInfo->set_agency(agency); } @@ -121,6 +124,11 @@ class GatewayNodeInfoFactoryImpl : public GatewayNodeInfoFactory return std::make_shared(m_p2pNodeID, m_agency); } + GatewayNodeInfo::Ptr build(std::string const& p2pNode) const override + { + return std::make_shared(p2pNode); + } + private: std::string m_p2pNodeID; std::string m_agency; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp index 7534e3b0..b22b73a4 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp @@ -79,6 +79,18 @@ void GatewayRouterManager::stop() ROUTER_MGR_LOG(INFO) << LOG_DESC("stop GatewayRouterManager success"); } +void GatewayRouterManager::removeUnreachableP2pNode(std::string const& p2pNode) +{ + ROUTER_MGR_LOG(INFO) << LOG_DESC("removeUnreachableP2pNode") + << LOG_KV("p2pid", printP2PIDElegantly(p2pNode)); + { + // remove statusSeq info + WriteGuard l(x_p2pID2Seq); + m_p2pID2Seq.erase(p2pNode); + } + m_peerRouter->removeP2PID(p2pNode); +} + void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSession::Ptr session) { auto statusSeq = diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h index 4cf9b0ba..679eb08e 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h @@ -36,6 +36,8 @@ class GatewayRouterManager virtual void start(); virtual void stop(); + void removeUnreachableP2pNode(std::string const& p2pNode); + protected: virtual void onReceiveNodeSeqMessage( bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index 3a7c035f..e9a7e25a 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -30,12 +30,14 @@ bool LocalRouter::registerNodeInfo(ppc::protocol::INodeInfo::Ptr nodeInfo, std::function onUnHealthHandler, bool removeHandlerOnUnhealth) { LOCAL_ROUTER_LOG(INFO) << LOG_DESC("registerNodeInfo") << printNodeInfo(nodeInfo); - nodeInfo->setFront(m_frontBuilder->buildClient( - nodeInfo->endPoint(), onUnHealthHandler, removeHandlerOnUnhealth)); auto ret = m_routerInfo->tryAddNodeInfo(nodeInfo); if (ret) { - LOCAL_ROUTER_LOG(INFO) << LOG_DESC("registerNodeInfo success") << printNodeInfo(nodeInfo); + // only create the frontClient when update + nodeInfo->setFront(m_frontBuilder->buildClient( + nodeInfo->endPoint(), onUnHealthHandler, removeHandlerOnUnhealth)); + LOCAL_ROUTER_LOG(INFO) << LOG_DESC("registerNodeInfo: update the node") + << printNodeInfo(nodeInfo); increaseSeq(); } return ret; @@ -86,6 +88,12 @@ bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc call } return true; } + // the case broadcast failed + if (msg->header() && msg->header()->optionalField() && + msg->header()->optionalField()->topic().empty()) + { + return false; + } if (!holding) { return false; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index abb55789..6063e355 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -32,6 +32,35 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) PEER_ROUTER_LOG(INFO) << LOG_DESC("updateGatewayInfo") << LOG_KV("detail", printNodeStatus(gatewayInfo)); auto nodeList = gatewayInfo->nodeList(); + + removeP2PNodeIDFromNodeIDInfos(gatewayInfo); + insertGatewayInfo(gatewayInfo); +} + +void PeerRouterTable::insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) +{ + auto nodeList = gatewayInfo->nodeList(); + bcos::WriteGuard l(x_mutex); + // insert new information for the gateway + for (auto const& it : nodeList) + { + // update nodeID => gatewayInfos + if (!m_nodeID2GatewayInfos.count(it.first)) + { + m_nodeID2GatewayInfos.insert(std::make_pair(it.first, GatewayNodeInfos())); + } + m_nodeID2GatewayInfos[it.first].insert(gatewayInfo); + } + if (!m_agency2GatewayInfos.count(gatewayInfo->agency())) + { + m_agency2GatewayInfos.insert(std::make_pair(gatewayInfo->agency(), GatewayNodeInfos())); + } + // update agency => gatewayInfos + m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo); +} + +void PeerRouterTable::removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo) +{ bcos::WriteGuard l(x_mutex); // remove the origin information of the gateway auto it = m_nodeID2GatewayInfos.begin(); @@ -50,31 +79,50 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) } it++; } - // insert new information for the gateway - for (auto const& it : nodeList) +} + +void PeerRouterTable::removeP2PNodeIDFromAgencyInfos(std::string const& p2pNode) +{ + bcos::WriteGuard l(x_mutex); + for (auto it = m_agency2GatewayInfos.begin(); it != m_agency2GatewayInfos.end();) { - // update nodeID => gatewayInfos - if (!m_nodeID2GatewayInfos.count(it.first)) + auto& gatewayInfos = it->second; + for (auto pGateway = gatewayInfos.begin(); pGateway != gatewayInfos.end();) { - m_nodeID2GatewayInfos.insert(std::make_pair(it.first, GatewayNodeInfos())); + if ((*pGateway)->p2pNodeID() == p2pNode) + { + pGateway = gatewayInfos.erase(pGateway); + continue; + } + pGateway++; } - m_nodeID2GatewayInfos[it.first].insert(gatewayInfo); - } - if (!m_agency2GatewayInfos.count(gatewayInfo->agency())) - { - m_agency2GatewayInfos.insert(std::make_pair(gatewayInfo->agency(), GatewayNodeInfos())); + if (gatewayInfos.empty()) + { + it = m_agency2GatewayInfos.erase(it); + continue; + } + it++; } - // update agency => gatewayInfos - m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo); } -std::vector PeerRouterTable::agencies() const +void PeerRouterTable::removeP2PID(std::string const& p2pNode) +{ + PEER_ROUTER_LOG(INFO) << LOG_DESC("PeerRouterTable: removeP2PID") + << LOG_KV("p2pID", printP2PIDElegantly(p2pNode)); + // remove P2PNode from m_nodeID2GatewayInfos + auto gatewayInfo = m_gatewayInfoFactory->build(p2pNode); + removeP2PNodeIDFromNodeIDInfos(gatewayInfo); + // remove P2PNode from m_agency2GatewayInfos + removeP2PNodeIDFromAgencyInfos(p2pNode); +} + +std::set PeerRouterTable::agencies() const { - std::vector agencies; + std::set agencies; bcos::ReadGuard l(x_mutex); for (auto const& it : m_agency2GatewayInfos) { - agencies.emplace_back(it.first); + agencies.insert(it.first); } return agencies; } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h index 7a286443..d9819420 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h @@ -31,16 +31,19 @@ class PeerRouterTable { public: using Ptr = std::shared_ptr; - PeerRouterTable(Service::Ptr service) : m_service(std::move(service)) {} + PeerRouterTable(Service::Ptr service, GatewayNodeInfoFactory::Ptr gatewayInfoFactory) + : m_service(std::move(service)), m_gatewayInfoFactory(std::move(gatewayInfoFactory)) + {} virtual ~PeerRouterTable() = default; virtual void updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo); + virtual void removeP2PID(std::string const& p2pNode); virtual GatewayNodeInfos selectRouter( ppc::protocol::RouteType const& routeType, ppc::protocol::Message::Ptr const& msg) const; virtual void asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const; - std::vector agencies() const; + std::set agencies() const; std::map gatewayInfos() const { @@ -52,10 +55,13 @@ class PeerRouterTable virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const; virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const; virtual GatewayNodeInfos selectRouterByAgency(ppc::protocol::Message::Ptr const& msg) const; - + void removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo); + void insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo); + void removeP2PNodeIDFromAgencyInfos(std::string const& p2pNode); private: Service::Ptr m_service; + GatewayNodeInfoFactory::Ptr m_gatewayInfoFactory; // nodeID => p2pNodes std::map m_nodeID2GatewayInfos; // agency => p2pNodes diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp index dfca0e5c..88a4c4e6 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -48,7 +48,6 @@ Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _rou boost::bind(&Service::onP2PDisconnect, this, boost::placeholders::_1)); } - void Service::onP2PConnect(WsSession::Ptr _session) { SERVICE_LOG(INFO) << LOG_DESC("Receive new p2p connection") @@ -88,6 +87,7 @@ void Service::onP2PConnect(WsSession::Ptr _session) { // the new session m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); + callNewSessionHandlers(_session); } SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect established new session") << LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId())) @@ -126,6 +126,7 @@ bool Service::removeSessionInfo(WsSession::Ptr const& _session) << LOG_KV("endpoint", _session->endPoint()); m_nodeID2Session.erase(it); + callDeleteSessionHandlers(_session); return true; } return false; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h index 377d1161..20b525b1 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h @@ -70,6 +70,17 @@ class Service : public bcos::boostssl::ws::WsService } } + // handlers called when new-session + void registerOnNewSession(std::function _handler) + { + m_newSessionHandlers.emplace_back(_handler); + } + // handlers called when delete-session + void registerOnDeleteSession(std::function _handler) + { + m_deleteSessionHandlers.emplace_back(_handler); + } + protected: void onRecvMessage(bcos::boostssl::MessageFace::Ptr _msg, bcos::boostssl::ws::WsSession::Ptr _session) override; @@ -93,6 +104,37 @@ class Service : public bcos::boostssl::ws::WsService bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options, bcos::boostssl::ws::RespCallBack respFunc); + virtual void callNewSessionHandlers(bcos::boostssl::ws::WsSession::Ptr _session) + { + try + { + for (auto const& handler : m_newSessionHandlers) + { + handler(_session); + } + } + catch (std::exception const& e) + { + SERVICE_LOG(WARNING) << LOG_DESC("callNewSessionHandlers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + } + } + virtual void callDeleteSessionHandlers(bcos::boostssl::ws::WsSession::Ptr _session) + { + try + { + for (auto const& handler : m_deleteSessionHandlers) + { + handler(_session); + } + } + catch (std::exception const& e) + { + SERVICE_LOG(WARNING) << LOG_DESC("callDeleteSessionHandlers exception") + << LOG_KV("error", boost::diagnostic_information(e)); + } + } + protected: std::string m_nodeID; // nodeID=>session @@ -105,5 +147,10 @@ class Service : public bcos::boostssl::ws::WsService // configuredNode=>nodeID std::map m_configuredNode2ID; mutable bcos::SharedMutex x_configuredNode2ID; + + // handlers called when new-session + std::vector> m_newSessionHandlers; + // handlers called when delete-session + std::vector> m_deleteSessionHandlers; }; } // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp index ce25385c..b4970643 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp @@ -165,19 +165,27 @@ void RouterManager::joinRouterTable( // called when the nodes become unreachable void RouterManager::onP2PNodesUnreachable(std::set const& _p2pNodeIDs) { - std::vector> handlers; + try { - ReadGuard readGuard(x_unreachableHandlers); - handlers = m_unreachableHandlers; - } - // TODO: async here - for (auto const& node : _p2pNodeIDs) - { - for (auto const& it : m_unreachableHandlers) + std::vector> handlers; + { + ReadGuard readGuard(x_unreachableHandlers); + handlers = m_unreachableHandlers; + } + // TODO: async here + for (auto const& node : _p2pNodeIDs) { - it(node); + for (auto const& it : m_unreachableHandlers) + { + it(node); + } } } + catch (std::exception const& e) + { + SERVICE_ROUTER_LOG(WARNING) << LOG_DESC("onP2PNodesUnreachable exception") + << LOG_KV("error", boost::diagnostic_information(e)); + } } void RouterManager::broadcastRouterSeq() @@ -191,4 +199,51 @@ void RouterManager::broadcastRouterSeq() message->setPayload(std::make_shared((byte*)&statusSeq, (byte*)&statusSeq + 4)); // the router table should only exchange between neighbor m_service->broadcastMessage(message); +} + +std::set RouterManager::onEraseSession(std::string const& sessionNodeID) +{ + eraseSeq(sessionNodeID); + std::set unreachableNodes; + if (m_service->routerTable()->erase(unreachableNodes, sessionNodeID)) + { + m_statusSeq++; + broadcastRouterSeq(); + } + onP2PNodesUnreachable(unreachableNodes); + SERVICE_ROUTER_LOG(INFO) << LOG_DESC("onEraseSession") + << LOG_KV("dst", printP2PIDElegantly(sessionNodeID)); + return unreachableNodes; +} + +bool RouterManager::eraseSeq(std::string const& _p2pNodeID) +{ + UpgradableGuard l(x_node2Seq); + if (!m_node2Seq.count(_p2pNodeID)) + { + return false; + } + UpgradeGuard ul(l); + m_node2Seq.erase(_p2pNodeID); + return true; +} + +std::set RouterManager::onNewSession(std::string const& sessionNodeID) +{ + std::set unreachableNodes; + auto entry = m_service->routerTableFactory()->createRouterEntry(); + entry->setDstNode(sessionNodeID); + entry->setDistance(0); + if (!m_service->routerTable()->update(unreachableNodes, m_service->nodeID(), entry)) + { + SERVICE_ROUTER_LOG(INFO) << LOG_DESC("onNewSession: RouterTable not changed") + << LOG_KV("dst", printP2PIDElegantly(sessionNodeID)); + return unreachableNodes; + } + m_statusSeq++; + broadcastRouterSeq(); + SERVICE_ROUTER_LOG(INFO) << LOG_DESC("onNewSession: update routerTable") + << LOG_KV("dst", printP2PIDElegantly(sessionNodeID)); + onP2PNodesUnreachable(unreachableNodes); + return unreachableNodes; } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h index 64ebf4c8..ab50a887 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h @@ -41,6 +41,8 @@ class RouterManager virtual void start(); virtual void stop(); + std::set onEraseSession(std::string const& sessionNodeID); + std::set onNewSession(std::string const& sessionNodeID); private: void onReceiveRouterSeq( @@ -54,6 +56,7 @@ class RouterManager bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); void joinRouterTable(std::string const& _generatedFrom, RouterTableInterface::Ptr _routerTable); + bool eraseSeq(std::string const& _p2pNodeID); void onP2PNodesUnreachable(std::set const& _p2pNodeIDs); diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.cpp b/cpp/wedpr-transport/sdk/ProTransportImpl.cpp index 2ff873ed..3763e339 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.cpp +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.cpp @@ -32,8 +32,9 @@ using namespace ppc::sdk; ProTransportImpl::ProTransportImpl(ppc::front::FrontConfig::Ptr config, int keepAlivePeriodMs) : m_config(std::move(config)), m_keepAlivePeriodMs(keepAlivePeriodMs) { - // default enable health-check - auto grpcServerConfig = std::make_shared(config->selfEndPoint(), true); + // Note: since the config has been moved away, should not use the `config`, use `m_config` + // instead default enable health-check + auto grpcServerConfig = std::make_shared(m_config->selfEndPoint(), true); m_server = std::make_shared(grpcServerConfig); FrontFactory frontFactory; @@ -41,14 +42,13 @@ ProTransportImpl::ProTransportImpl(ppc::front::FrontConfig::Ptr config, int keep std::make_shared(m_config->grpcConfig(), m_config->gatewayGrpcTarget()); m_front = frontFactory.build(std::make_shared(), std::make_shared(), - std::make_shared(), m_gateway, config); + std::make_shared(), m_gateway, m_config); auto msgBuilder = std::make_shared(std::make_shared()); - auto frontService = std::make_shared(msgBuilder, m_front); - frontService->setHealthCheckService(m_server->server()->GetHealthCheckService()); + m_frontService = std::make_shared(msgBuilder, m_front); // register the frontService - m_server->registerService(frontService); + m_server->registerService(m_frontService); } void ProTransportImpl::start() @@ -65,6 +65,8 @@ void ProTransportImpl::start() }); m_timer->start(); m_server->start(); + // Note: the server is inited after start + m_frontService->setHealthCheckService(m_server->server()->GetHealthCheckService()); m_front->start(); } diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.h b/cpp/wedpr-transport/sdk/ProTransportImpl.h index ed29cbdd..76e2ce3a 100644 --- a/cpp/wedpr-transport/sdk/ProTransportImpl.h +++ b/cpp/wedpr-transport/sdk/ProTransportImpl.h @@ -24,7 +24,8 @@ namespace ppc::protocol { class GrpcServer; -} +class FrontServer; +} // namespace ppc::protocol namespace ppc::sdk @@ -44,6 +45,7 @@ class ProTransportImpl : public Transport, public std::enable_shared_from_this

m_server; + std::shared_ptr m_frontService; int m_keepAlivePeriodMs; std::shared_ptr m_timer; };