From 4b0d644a76f491f3cc3b28b8844b623c0e806597 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Wed, 18 Sep 2024 19:12:57 +0800 Subject: [PATCH] add asyncGetPeers && registerComponent && unRegisterComponent --- cpp/cmake/Options.cmake | 1 + cpp/ppc-framework/front/IFront.h | 28 +- cpp/ppc-framework/gateway/IGateway.h | 2 +- cpp/ppc-framework/protocol/INodeInfo.h | 5 +- .../grpc/client/GatewayClient.cpp | 8 +- .../grpc/client/GatewayClient.h | 2 +- .../grpc/server/GatewayServer.cpp | 11 +- .../grpc/server/GatewayServer.h | 2 +- cpp/wedpr-protocol/proto/pb/Service.proto | 5 +- .../protobuf/src/NodeInfoImpl.cpp | 14 +- .../protobuf/src/NodeInfoImpl.h | 48 +++- .../ppc-front/ppc-front/Front.cpp | 52 ++-- .../ppc-front/ppc-front/FrontImpl.cpp | 24 ++ .../ppc-front/ppc-front/FrontImpl.h | 11 +- .../ppc-gateway/gateway/GatewayImpl.cpp | 5 +- .../ppc-gateway/gateway/GatewayImpl.h | 2 +- .../gateway/router/GatewayNodeInfo.h | 1 + .../gateway/router/GatewayNodeInfoImpl.cpp | 15 ++ .../gateway/router/GatewayNodeInfoImpl.h | 2 + .../ppc-gateway/gateway/router/LocalRouter.h | 1 - .../gateway/router/PeerRouterTable.cpp | 21 +- .../gateway/router/PeerRouterTable.h | 2 +- .../sdk-wrapper/CMakeLists.txt | 2 +- .../jni/generated/GetPeersInfoHandler.java | 67 +++++ .../wedpr/sdk/jni/generated/IFront.java | 18 ++ .../generated/SharedGetPeersInfoHandler.java | 54 ++++ .../generated/wedpr_java_transportJNI.java | 33 +++ .../sdk/jni/transport/WeDPRTransport.java | 17 ++ .../transport/handlers/GetPeersCallback.java | 31 +++ .../sdk/jni/transport/impl/TransportImpl.java | 37 ++- .../src/wedpr_java_transportJAVA_wrap.cxx | 254 +++++++++++++++++- .../java/src/wedpr_java_transportJAVA_wrap.h | 15 ++ .../java/swig/wedpr_java_transport.i | 3 + .../sdk/src/ProTransportImpl.cpp | 2 +- 34 files changed, 734 insertions(+), 61 deletions(-) create mode 100644 cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler.java create mode 100644 cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/SharedGetPeersInfoHandler.java create mode 100644 cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/GetPeersCallback.java diff --git a/cpp/cmake/Options.cmake b/cpp/cmake/Options.cmake index 0293ad6b..cdc6eace 100644 --- a/cpp/cmake/Options.cmake +++ b/cpp/cmake/Options.cmake @@ -181,6 +181,7 @@ macro(print_config NAME) message("-- BUILD_SDK BUILD SDK ${BUILD_SDK}") message("-- BUILD_UDF BUILD UDF ${BUILD_UDF}") message("-- BUILD_WEDPR_TOOLKIT BUILD_WEDPR_TOOLKIT ${BUILD_WEDPR_TOOLKIT}") + message("-- AUTO_GENERATE AUTO_GENERATE ${AUTO_GENERATE}") message("-- DEBUG ${DEBUG}") message("------------------------------------------------------------------------") message("") diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h index fd0c10cc..dfa8e95f 100644 --- a/cpp/ppc-framework/front/IFront.h +++ b/cpp/ppc-framework/front/IFront.h @@ -63,6 +63,7 @@ class MessageDispatcherHandler virtual void onMessage(ppc::protocol::Message::Ptr msg) = 0; }; + class SendResponseHandler { public: @@ -92,6 +93,16 @@ class IMessageHandler SendResponseHandler sendResponseHandler) = 0; }; +class GetPeersInfoHandler +{ +public: + using Ptr = std::shared_ptr; + GetPeersInfoHandler() = default; + virtual ~GetPeersInfoHandler() {} + + virtual void onPeersInfo(bcos::Error::Ptr e, std::string const& peersInfo) = 0; +}; + ///////// the callback definition for sdk wrapper ///////// class IFront : virtual public IFrontClient @@ -183,14 +194,15 @@ class IFront : virtual public IFrontClient // !!! Note: the 'payload ' type(char*) should not been changed, since it used to pass-in java // byte[] data - virtual void async_send_response(char* dstNode, uint64_t dstNodeSize, std::string const& traceID, - char* payload, uint64_t payloadSize, int seq, ErrorCallback::Ptr errorCallback) + virtual void async_send_response(char* dstNode, uint64_t dstNodeSize, + std::string const& traceID, char* payload, uint64_t payloadSize, int seq, + ErrorCallback::Ptr errorCallback) { // TODO: optimize here bcos::bytes copiedDstNode(dstNode, dstNode + dstNodeSize); bcos::bytes copyedPayload(payload, payload + payloadSize); - asyncSendResponse( - copiedDstNode, traceID, std::move(copyedPayload), seq, populateErrorCallback(errorCallback)); + asyncSendResponse(copiedDstNode, traceID, std::move(copyedPayload), seq, + populateErrorCallback(errorCallback)); } // the sync interface for async_send_message @@ -209,9 +221,11 @@ class IFront : virtual public IFrontClient virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0; virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0; - virtual void asyncGetAgencies( + virtual void asyncGetAgencies(std::vector const& components, std::function)> callback) = 0; + virtual void asyncGetPeers(GetPeersInfoHandler::Ptr getPeersCallback) = 0; + /** * @brief register the nodeInfo to the gateway * @param nodeInfo the nodeInfo @@ -223,6 +237,8 @@ class IFront : virtual public IFrontClient */ virtual bcos::Error::Ptr unRegisterNodeInfo() = 0; + virtual ppc::protocol::INodeInfo::Ptr const& nodeInfo() = 0; + /** * @brief register the topic * @@ -237,6 +253,8 @@ class IFront : virtual public IFrontClient */ virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0; + virtual void registerComponent(std::string const& component) = 0; + virtual void unRegisterComponent(std::string const& component) = 0; private: ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback) diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h index 7ccc53c5..2b5431b0 100644 --- a/cpp/ppc-framework/gateway/IGateway.h +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -66,7 +66,7 @@ class IGateway bcos::bytes&& payload) = 0; virtual void asyncGetPeers(std::function callback) = 0; - virtual void asyncGetAgencies( + virtual void asyncGetAgencies(std::vector const& components, std::function)> callback) = 0; virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index c949e8ed..37f8b772 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -47,7 +47,10 @@ class INodeInfo // components virtual void setComponents(std::set const& components) = 0; + virtual bool addComponent(std::string const& component) = 0; + virtual bool eraseComponent(std::string const& component) = 0; virtual std::set const& components() const = 0; + virtual std::vector copiedComponents() const = 0; virtual void encode(bcos::bytes& data) const = 0; virtual void decode(bcos::bytesConstRef data) = 0; @@ -58,7 +61,7 @@ class INodeInfo virtual bool equal(INodeInfo::Ptr const& info) { return (nodeID().toBytes() == info->nodeID().toBytes()) && - (components() == info->components()); + (copiedComponents() == info->copiedComponents()); } virtual void toJson(Json::Value& jsonObject) const = 0; diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index 9a71005b..b8108dbe 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -65,12 +65,16 @@ void GatewayClient::asyncGetPeers(std::function const& components, std::function)> callback) { auto response = std::make_shared(); auto context = std::make_shared(); - auto request = std::make_shared(); + auto request = std::make_shared(); + for (auto const& it : components) + { + request->add_components(it); + } // lambda keeps the lifecycle for clientContext m_stub->async()->asyncGetAgencies( context.get(), request.get(), response.get(), [context, callback, response](Status status) { diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index 2206e000..2594890e 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -54,7 +54,7 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override; void asyncGetPeers(std::function callback) override; - void asyncGetAgencies( + void asyncGetAgencies(std::vector const& components, std::function)> callback) override; void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp index e25fcb14..f8e701ed 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp @@ -75,14 +75,19 @@ grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers( return reactor; } -grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies( - grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::AgenciesInfo* reply) +grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies(grpc::CallbackServerContext* context, + const ppc::proto::Condition* condition, ppc::proto::AgenciesInfo* reply) { ServerUnaryReactor* reactor(context->DefaultReactor()); try { + std::vector components; + for (int i = 0; i < condition->components_size(); i++) + { + components.emplace_back(condition->components(i)); + } m_gateway->asyncGetAgencies( - [reactor, reply](bcos::Error::Ptr error, std::set agencies) { + components, [reactor, reply](bcos::Error::Ptr error, std::set agencies) { toSerializedError(reply->mutable_error(), error); for (auto const& it : agencies) { diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.h b/cpp/wedpr-protocol/grpc/server/GatewayServer.h index 7c62048d..7de12ed7 100644 --- a/cpp/wedpr-protocol/grpc/server/GatewayServer.h +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.h @@ -42,7 +42,7 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService grpc::ServerUnaryReactor* asyncGetPeers(grpc::CallbackServerContext* context, const ppc::proto::Empty* request, ppc::proto::PeersInfo* reply) override; grpc::ServerUnaryReactor* asyncGetAgencies(grpc::CallbackServerContext* context, - const ppc::proto::Empty* request, ppc::proto::AgenciesInfo* reply) override; + const ppc::proto::Condition* request, ppc::proto::AgenciesInfo* reply) override; grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context, diff --git a/cpp/wedpr-protocol/proto/pb/Service.proto b/cpp/wedpr-protocol/proto/pb/Service.proto index f531f08f..2f633dba 100644 --- a/cpp/wedpr-protocol/proto/pb/Service.proto +++ b/cpp/wedpr-protocol/proto/pb/Service.proto @@ -41,7 +41,10 @@ message PeersInfo{ string peersInfo = 2; }; message Empty{ +}; +message Condition{ + repeated string components = 1; }; service Front { @@ -50,7 +53,7 @@ service Front { service Gateway{ rpc asyncSendMessage(SendedMessageRequest) returns(Error){} rpc asyncGetPeers(Empty)returns(PeersInfo){} - rpc asyncGetAgencies(Empty)returns(AgenciesInfo){} + rpc asyncGetAgencies(Condition)returns(AgenciesInfo){} rpc registerNodeInfo(NodeInfo) returns(Error){} rpc unRegisterNodeInfo(NodeInfo)returns(Error){} rpc registerTopic(NodeInfo) returns(Error){} diff --git a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp index 59cf149e..d3c440bd 100644 --- a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp @@ -25,16 +25,28 @@ using namespace ppc::protocol; void NodeInfoImpl::encode(bcos::bytes& data) const { + encodeFields(); + encodePBObject(data, m_rawNodeInfo); +} + +void NodeInfoImpl::encodeFields() +{ + bcos::ReadGuard l(x_components); // set the components for (auto const& component : m_components) { m_rawNodeInfo->add_components(component); } - encodePBObject(data, m_rawNodeInfo); } void NodeInfoImpl::decode(bcos::bytesConstRef data) { decodePBObject(m_rawNodeInfo, data); + decodeFields(); +} + +void NodeInfoImpl::decodeFields() +{ + bcos::WriteGuard l(x_components); m_components = std::set( m_rawNodeInfo->components().begin(), m_rawNodeInfo->components().end()); } diff --git a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h index 02f8b31d..40262676 100644 --- a/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h +++ b/cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h @@ -32,7 +32,9 @@ class NodeInfoImpl : public INodeInfo NodeInfoImpl() { m_rawNodeInfo = std::make_shared(); } explicit NodeInfoImpl(std::shared_ptr rawNodeInfo) : m_rawNodeInfo(rawNodeInfo) - {} + { + decodeFields(); + } NodeInfoImpl(bcos::bytesConstRef const& data) : NodeInfoImpl() { decode(data); } NodeInfoImpl(bcos::bytesConstRef const& nodeID, std::string const& endPoint) : NodeInfoImpl() @@ -54,9 +56,45 @@ class NodeInfoImpl : public INodeInfo void setComponents(std::set const& components) override { + bcos::WriteGuard l(x_components); m_components = components; } - std::set const& components() const override { return m_components; } + + std::set const& components() const override + { + bcos::ReadGuard l(x_components); + return m_components; + } + + std::vector copiedComponents() const override + { + bcos::ReadGuard l(x_components); + return std::vector(m_components.begin(), m_components.end()); + } + + bool addComponent(std::string const& component) override + { + bcos::UpgradableGuard l(x_components); + if (m_components.count(component)) + { + return false; + } + bcos::UpgradeGuard ul(l); + m_components.insert(component); + return true; + } + + bool eraseComponent(std::string const& component) override + { + bcos::UpgradableGuard l(x_components); + if (!m_components.count(component)) + { + return false; + } + bcos::UpgradeGuard ul(l); + m_components.erase(component); + return true; + } std::string const& endPoint() const override { return m_rawNodeInfo->endpoint(); } @@ -78,9 +116,15 @@ class NodeInfoImpl : public INodeInfo void toJson(Json::Value& jsonObject) const override; + void encodeFields(); + +protected: + virtual void decodeFields(); + private: std::shared_ptr m_front; std::set m_components; + mutable bcos::SharedMutex x_components; std::shared_ptr m_rawNodeInfo; }; diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp index 7a205b7b..0fa9c7f5 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp @@ -29,7 +29,7 @@ using namespace ppc::front; Front::Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front) : m_messageFactory(std::move(ppcMsgFactory)), m_front(std::move(front)) { - m_fetcher = std::make_shared(60 * 1000, "metaFetcher"); + m_fetcher = std::make_shared(10 * 1000, "metaFetcher"); m_fetcher->registerTimeoutHandler([this]() { try { @@ -58,30 +58,31 @@ void Front::stop() void Front::fetchGatewayMetaInfo() { auto self = weak_from_this(); - m_front->asyncGetAgencies([self](bcos::Error::Ptr error, std::set agencies) { - auto front = self.lock(); - if (!front) - { - return; - } - if (error && error->errorCode() != 0) - { - FRONT_LOG(WARNING) << LOG_DESC("asyncGetAgencies failed") - << LOG_KV("code", error->errorCode()) - << LOG_KV("msg", error->errorMessage()); - return; - } - std::vector agencyList(agencies.begin(), agencies.end()); - bcos::UpgradableGuard l(front->x_agencyList); - if (front->m_agencyList == agencyList) - { - return; - } - bcos::UpgradeGuard ul(l); - front->m_agencyList = agencyList; - FRONT_LOG(INFO) << LOG_DESC("Update agencies information") - << LOG_KV("agencies", printVector(agencyList)); - }); + m_front->asyncGetAgencies(m_front->nodeInfo()->copiedComponents(), + [self](bcos::Error::Ptr error, std::set agencies) { + auto front = self.lock(); + if (!front) + { + return; + } + if (error && error->errorCode() != 0) + { + FRONT_LOG(WARNING) + << LOG_DESC("asyncGetAgencies failed") << LOG_KV("code", error->errorCode()) + << LOG_KV("msg", error->errorMessage()); + return; + } + std::vector agencyList(agencies.begin(), agencies.end()); + bcos::UpgradableGuard l(front->x_agencyList); + if (front->m_agencyList == agencyList) + { + return; + } + bcos::UpgradeGuard ul(l); + front->m_agencyList = agencyList; + FRONT_LOG(INFO) << LOG_DESC("Update agencies information") + << LOG_KV("agencies", printVector(agencyList)); + }); m_fetcher->restart(); } @@ -192,4 +193,5 @@ void Front::registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType, << LOG_KV("error", boost::diagnostic_information(e)); } }); + m_front->registerComponent(std::to_string(type)); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp index ca774092..96a4b00b 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp @@ -275,4 +275,28 @@ bcos::Error::Ptr FrontImpl::push(uint16_t routeType, MessageOptionalHeader::Ptr routeType, routeInfo, std::move(payload), seq, timeout, [promise](bcos::Error::Ptr error) { promise->set_value(error); }, nullptr); return promise->get_future().get(); +} + +void FrontImpl::asyncGetPeers(GetPeersInfoHandler::Ptr getPeersCallback) +{ + m_gatewayClient->asyncGetPeers( + [getPeersCallback](bcos::Error::Ptr error, std::string peersInfo) { + getPeersCallback->onPeersInfo(error, peersInfo); + }); +} + +void FrontImpl::registerComponent(std::string const& component) +{ + // Note: the node will report the latest components + auto ret = m_nodeInfo->addComponent(component); + FRONT_LOG(INFO) << LOG_DESC("registerComponent") << LOG_KV("component", component) + << LOG_KV("insert", ret); +} + +void FrontImpl::unRegisterComponent(std::string const& component) +{ + // Note: the node will report the latest components + auto ret = m_nodeInfo->eraseComponent(component); + FRONT_LOG(INFO) << LOG_DESC("unRegisterComponent") << LOG_KV("component", component) + << LOG_KV("erase", ret); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h index 84000656..13fb00d9 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h @@ -141,12 +141,14 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ return m_gatewayClient->registerTopic(bcos::ref(m_nodeID), topic); } - void asyncGetAgencies( + void asyncGetAgencies(std::vector const& components, std::function)> callback) override { - m_gatewayClient->asyncGetAgencies(callback); + m_gatewayClient->asyncGetAgencies(components, callback); } + void asyncGetPeers(GetPeersInfoHandler::Ptr getPeersCallback) override; + /** * @brief unRegister the topic * @@ -170,6 +172,11 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_ void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID, bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) override; + ppc::protocol::INodeInfo::Ptr const& nodeInfo() override { return m_nodeInfo; } + + void registerComponent(std::string const& component) override; + void unRegisterComponent(std::string const& component) override; + private: void asyncSendMessageToGateway(bool responsePacket, ppc::protocol::MessagePayload::Ptr&& frontMessage, ppc::protocol::RouteType routeType, diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index fe2becda..d8bc8fea 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -322,13 +322,14 @@ void GatewayImpl::asyncGetPeers(std::function cal } } -void GatewayImpl::asyncGetAgencies(std::function)> callback) +void GatewayImpl::asyncGetAgencies(std::vector const& components, + std::function)> callback) { if (!callback) { return; } - auto agencies = m_peerRouter->agencies(); + auto agencies = m_peerRouter->agencies(components); agencies.insert(m_agency); callback(nullptr, agencies); } \ No newline at end of file diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index 87e10fa5..fd83cb25 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -68,7 +68,7 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this callback) override; - void asyncGetAgencies( + void asyncGetAgencies(std::vector const& components, std::function)> callback) override; protected: diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h index cfa4362e..6014a6e2 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -59,6 +59,7 @@ class GatewayNodeInfo virtual void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) = 0; virtual std::map nodeList() const = 0; + virtual bool existComponent(std::string const& component) const = 0; virtual uint16_t nodeSize() const = 0; virtual void toJson(Json::Value& jsonObject) const = 0; }; diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index 0dda811e..1c52fa8a 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -59,6 +59,19 @@ INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const return nullptr; } +bool GatewayNodeInfoImpl::existComponent(std::string const& component) const +{ + bcos::ReadGuard l(x_nodeList); + for (auto const& it : m_nodeList) + { + if (it.second->components().count(component)) + { + return true; + } + } + return false; +} + void GatewayNodeInfoImpl::updateNodeList() { // Note: can't use clear_nodelist here, for clear_nodelist will destroy the allocated nodelist, @@ -68,11 +81,13 @@ void GatewayNodeInfoImpl::updateNodeList() for (auto const& it : m_nodeList) { auto nodeInfo = std::dynamic_pointer_cast(it.second); + nodeInfo->encodeFields(); m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaAddAllocated( nodeInfo->rawNodeInfo().get()); } } +// Note: this is wrappered with lock bool GatewayNodeInfoImpl::tryAddNodeInfo(INodeInfo::Ptr const& info) { auto nodeID = info->nodeID().toBytes(); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index 553af157..aeab2c1d 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -85,6 +85,8 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo void toJson(Json::Value& jsonObject) const override; + bool existComponent(std::string const& component) const override; + private: void updateNodeList(); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h index a3b33f2f..d1fc9040 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -53,7 +53,6 @@ class LocalRouter virtual std::vector chooseReceiver( ppc::protocol::Message::Ptr const& msg); - // TODO: register component virtual bool dispatcherMessage(ppc::protocol::Message::Ptr const& msg, ppc::protocol::ReceiveMsgFunc callback, bool holding = true); diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index 6063e355..9685e82c 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -116,13 +116,30 @@ void PeerRouterTable::removeP2PID(std::string const& p2pNode) removeP2PNodeIDFromAgencyInfos(p2pNode); } -std::set PeerRouterTable::agencies() const +std::set PeerRouterTable::agencies(std::vector const& components) const { std::set agencies; bcos::ReadGuard l(x_mutex); for (auto const& it : m_agency2GatewayInfos) { - agencies.insert(it.first); + // get all agencies + if (components.empty()) + { + agencies.insert(it.first); + continue; + } + // get agencies according to component + for (auto const& gatewayInfo : it.second) + { + for (auto const& component : components) + { + if (gatewayInfo->existComponent(component)) + { + agencies.insert(it.first); + break; + } + } + } } return agencies; } diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h index d9819420..fdb7bc6c 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h @@ -43,7 +43,7 @@ class PeerRouterTable virtual void asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const; - std::set agencies() const; + std::set agencies(std::vector const& components) const; std::map gatewayInfos() const { diff --git a/cpp/wedpr-transport/sdk-wrapper/CMakeLists.txt b/cpp/wedpr-transport/sdk-wrapper/CMakeLists.txt index e1d6cf3d..ab69fa97 100644 --- a/cpp/wedpr-transport/sdk-wrapper/CMakeLists.txt +++ b/cpp/wedpr-transport/sdk-wrapper/CMakeLists.txt @@ -4,7 +4,7 @@ find_package(SWIG REQUIRED) include(${SWIG_USE_FILE}) # Add subdirectories for each language if desired -option(BUILD_PYTHON "Build Python SWIG module" ON) +option(BUILD_PYTHON "Build Python SWIG module" OFF) if(BUILD_PYTHON) # fetch the python dependencies option(FETCH_PYTHON_DEPS "Install python required modules if not available" ON) diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler.java new file mode 100644 index 00000000..5a73e448 --- /dev/null +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler.java @@ -0,0 +1,67 @@ +/* ---------------------------------------------------------------------------- + * This file was automatically generated by SWIG (https://www.swig.org). + * Version 4.2.1 + * + * Do not make changes to this file unless you know what you are doing - modify + * the SWIG interface file instead. + * ----------------------------------------------------------------------------- */ + +package com.webank.wedpr.sdk.jni.generated; + +public class GetPeersInfoHandler { + private transient long swigCPtr; + private transient boolean swigCMemOwn; + + protected GetPeersInfoHandler(long cPtr, boolean cMemoryOwn) { + swigCMemOwn = cMemoryOwn; + swigCPtr = cPtr; + } + + protected static long getCPtr(GetPeersInfoHandler obj) { + return (obj == null) ? 0 : obj.swigCPtr; + } + + protected void swigSetCMemOwn(boolean own) { + swigCMemOwn = own; + } + + @SuppressWarnings({"deprecation", "removal"}) + protected void finalize() { + delete(); + } + + public synchronized void delete() { + if (swigCPtr != 0) { + if (swigCMemOwn) { + swigCMemOwn = false; + wedpr_java_transportJNI.delete_GetPeersInfoHandler(swigCPtr); + } + swigCPtr = 0; + } + } + + protected void swigDirectorDisconnect() { + swigSetCMemOwn(false); + delete(); + } + + public void swigReleaseOwnership() { + swigSetCMemOwn(false); + wedpr_java_transportJNI.GetPeersInfoHandler_change_ownership(this, swigCPtr, false); + } + + public void swigTakeOwnership() { + swigSetCMemOwn(true); + wedpr_java_transportJNI.GetPeersInfoHandler_change_ownership(this, swigCPtr, true); + } + + public GetPeersInfoHandler() { + this(wedpr_java_transportJNI.new_GetPeersInfoHandler(), true); + wedpr_java_transportJNI.GetPeersInfoHandler_director_connect(this, swigCPtr, true, true); + } + + public void onPeersInfo(Error e, String peersInfo) { + wedpr_java_transportJNI.GetPeersInfoHandler_onPeersInfo( + swigCPtr, this, Error.getCPtr(e), e, peersInfo); + } +} diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/IFront.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/IFront.java index bc5aac7c..1fcf80c1 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/IFront.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/IFront.java @@ -166,6 +166,11 @@ public Message peek(String topic) { return (cPtr == 0) ? null : new Message(cPtr, true); } + public void asyncGetPeers(GetPeersInfoHandler getPeersCallback) { + wedpr_java_transportJNI.IFront_asyncGetPeers( + swigCPtr, this, GetPeersInfoHandler.getCPtr(getPeersCallback), getPeersCallback); + } + /** * register the nodeInfo to the gateway
* @@ -184,6 +189,11 @@ public Error unRegisterNodeInfo() { return (cPtr == 0) ? null : new Error(cPtr, true); } + public SWIGTYPE_p_ppc__protocol__INodeInfo__Ptr nodeInfo() { + return new SWIGTYPE_p_ppc__protocol__INodeInfo__Ptr( + wedpr_java_transportJNI.IFront_nodeInfo(swigCPtr, this), false); + } + /** * register the topic
*
@@ -205,4 +215,12 @@ public Error unRegisterTopic(String topic) { long cPtr = wedpr_java_transportJNI.IFront_unRegisterTopic(swigCPtr, this, topic); return (cPtr == 0) ? null : new Error(cPtr, true); } + + public void registerComponent(String component) { + wedpr_java_transportJNI.IFront_registerComponent(swigCPtr, this, component); + } + + public void unRegisterComponent(String component) { + wedpr_java_transportJNI.IFront_unRegisterComponent(swigCPtr, this, component); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/SharedGetPeersInfoHandler.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/SharedGetPeersInfoHandler.java new file mode 100644 index 00000000..a97f2ab9 --- /dev/null +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/SharedGetPeersInfoHandler.java @@ -0,0 +1,54 @@ +/* ---------------------------------------------------------------------------- + * This file was automatically generated by SWIG (https://www.swig.org). + * Version 4.2.1 + * + * Do not make changes to this file unless you know what you are doing - modify + * the SWIG interface file instead. + * ----------------------------------------------------------------------------- */ + +package com.webank.wedpr.sdk.jni.generated; + +public class SharedGetPeersInfoHandler { + private transient long swigCPtr; + protected transient boolean swigCMemOwn; + + protected SharedGetPeersInfoHandler(long cPtr, boolean cMemoryOwn) { + swigCMemOwn = cMemoryOwn; + swigCPtr = cPtr; + } + + protected static long getCPtr(SharedGetPeersInfoHandler obj) { + return (obj == null) ? 0 : obj.swigCPtr; + } + + protected static long swigRelease(SharedGetPeersInfoHandler obj) { + long ptr = 0; + if (obj != null) { + if (!obj.swigCMemOwn) + throw new RuntimeException("Cannot release ownership as memory is not owned"); + ptr = obj.swigCPtr; + obj.swigCMemOwn = false; + obj.delete(); + } + return ptr; + } + + @SuppressWarnings({"deprecation", "removal"}) + protected void finalize() { + delete(); + } + + public synchronized void delete() { + if (swigCPtr != 0) { + if (swigCMemOwn) { + swigCMemOwn = false; + wedpr_java_transportJNI.delete_SharedGetPeersInfoHandler(swigCPtr); + } + swigCPtr = 0; + } + } + + public SharedGetPeersInfoHandler() { + this(wedpr_java_transportJNI.new_SharedGetPeersInfoHandler(), true); + } +} diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transportJNI.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transportJNI.java index 8418644a..311ec4fe 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transportJNI.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transportJNI.java @@ -41,6 +41,10 @@ public class wedpr_java_transportJNI { public static final native void delete_SharedIMessageHandler(long jarg1); + public static final native long new_SharedGetPeersInfoHandler(); + + public static final native void delete_SharedGetPeersInfoHandler(long jarg1); + public static final native long new_SharedGateway(); public static final native void delete_SharedGateway(long jarg1); @@ -647,6 +651,19 @@ public static final native void IMessageHandler_director_connect( public static final native void IMessageHandler_change_ownership( IMessageHandler obj, long cptr, boolean take_or_release); + public static final native long new_GetPeersInfoHandler(); + + public static final native void delete_GetPeersInfoHandler(long jarg1); + + public static final native void GetPeersInfoHandler_onPeersInfo( + long jarg1, GetPeersInfoHandler jarg1_, long jarg2, Error jarg2_, String jarg3); + + public static final native void GetPeersInfoHandler_director_connect( + GetPeersInfoHandler obj, long cptr, boolean mem_own, boolean weak_global); + + public static final native void GetPeersInfoHandler_change_ownership( + GetPeersInfoHandler obj, long cptr, boolean take_or_release); + public static final native void delete_IFront(long jarg1); public static final native void IFront_start(long jarg1, IFront jarg1_); @@ -712,14 +729,25 @@ public static final native long IFront_push__SWIG_1( public static final native long IFront_peek(long jarg1, IFront jarg1_, String jarg2); + public static final native void IFront_asyncGetPeers( + long jarg1, IFront jarg1_, long jarg2, GetPeersInfoHandler jarg2_); + public static final native long IFront_registerNodeInfo(long jarg1, IFront jarg1_, long jarg2); public static final native long IFront_unRegisterNodeInfo(long jarg1, IFront jarg1_); + public static final native long IFront_nodeInfo(long jarg1, IFront jarg1_); + public static final native long IFront_registerTopic(long jarg1, IFront jarg1_, String jarg2); public static final native long IFront_unRegisterTopic(long jarg1, IFront jarg1_, String jarg2); + public static final native void IFront_registerComponent( + long jarg1, IFront jarg1_, String jarg2); + + public static final native void IFront_unRegisterComponent( + long jarg1, IFront jarg1_, String jarg2); + public static final native void delete_IFrontBuilder(long jarg1); public static final native long IFrontBuilder_buildClient( @@ -783,6 +811,11 @@ public static void SwigDirector_IMessageHandler_onMessage( new SendResponseHandler(sendResponseHandler, true)); } + public static void SwigDirector_GetPeersInfoHandler_onPeersInfo( + GetPeersInfoHandler jself, long e, String peersInfo) { + jself.onPeersInfo((e == 0) ? null : new Error(e, true), peersInfo); + } + private static final native void swig_module_init(); static { diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java index 5a716f6b..5e0545e4 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java @@ -16,6 +16,7 @@ package com.webank.wedpr.sdk.jni.transport; import com.webank.wedpr.sdk.jni.common.WeDPRSDKException; +import com.webank.wedpr.sdk.jni.transport.handlers.GetPeersCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback; @@ -57,6 +58,22 @@ public interface WeDPRTransport { */ void unRegisterTopic(String topic) throws Exception; + /** + * register handlers according to component + * + * @param component the component of the message should handled by the given callback + * @param messageDispatcherCallback the message callback + */ + void registerComponentHandler( + String component, MessageDispatcherCallback messageDispatcherCallback); + + /** + * async get peers information + * + * @param handler the handler that handle the peersInfo + */ + void asyncGetPeers(GetPeersCallback handler); + //// the async interfaces /** * register the message handler diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/GetPeersCallback.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/GetPeersCallback.java new file mode 100644 index 00000000..17e783d5 --- /dev/null +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/GetPeersCallback.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.sdk.jni.transport.handlers; + +import com.webank.wedpr.sdk.jni.generated.Error; +import com.webank.wedpr.sdk.jni.generated.GetPeersInfoHandler; + +public abstract class GetPeersCallback extends GetPeersInfoHandler { + + public abstract void onPeersInfo(Error e, String peersInfo); + + // release the ownership to c++, in case of it's released by the jvm + @Override + protected void finalize() { + swigReleaseOwnership(); + delete(); + } +} diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java index 46420d94..d2d497b6 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java @@ -24,6 +24,7 @@ import com.webank.wedpr.sdk.jni.transport.IMessageBuilder; import com.webank.wedpr.sdk.jni.transport.TransportConfig; import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; +import com.webank.wedpr.sdk.jni.transport.handlers.GetPeersCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback; @@ -70,8 +71,8 @@ public void stop() { * @throws Exception failed case */ @Override - public void registerComponent(String component) throws Exception { - // Note: the front must exist after Transport created + public void registerComponent(String component) { + this.transport.getFront().registerComponent(component); } /** @@ -81,7 +82,9 @@ public void registerComponent(String component) throws Exception { * @throws Exception failed case */ @Override - public void unRegisterComponent(String component) throws Exception {} + public void unRegisterComponent(String component) { + this.transport.getFront().unRegisterComponent(component); + } /** * register the topic @@ -91,7 +94,8 @@ public void unRegisterComponent(String component) throws Exception {} */ @Override public void registerTopic(String topic) throws Exception { - this.transport.getFront().registerTopic(topic); + Error result = this.transport.getFront().registerTopic(topic); + Common.checkResult("registerTopic", result); } /** @@ -102,7 +106,30 @@ public void registerTopic(String topic) throws Exception { */ @Override public void unRegisterTopic(String topic) throws Exception { - this.transport.getFront().unRegisterTopic(topic); + Error result = this.transport.getFront().unRegisterTopic(topic); + Common.checkResult("unRegisterTopic", result); + } + + /** + * register handlers according to component + * + * @param component the component of the message should handled by the given callback + * @param messageDispatcherCallback the message callback + */ + @Override + public void registerComponentHandler( + String component, MessageDispatcherCallback messageDispatcherCallback) { + this.transport.getFront().register_msg_handler(component, messageDispatcherCallback); + } + + /** + * async get peers information + * + * @param handler the handler that handle the peersInfo + */ + @Override + public void asyncGetPeers(GetPeersCallback handler) { + this.transport.getFront().asyncGetPeers(handler); } @Override diff --git a/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.cxx b/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.cxx index e103bfd8..c5bc735c 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.cxx +++ b/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.cxx @@ -781,7 +781,7 @@ namespace Swig { namespace Swig { namespace { jclass jclass_wedpr_java_transportJNI = NULL; - jmethodID director_method_ids[3]; + jmethodID director_method_ids[4]; } } @@ -1202,6 +1202,66 @@ void SwigDirector_IMessageHandler::swig_connect_director(JNIEnv *jenv, jobject j } +SwigDirector_GetPeersInfoHandler::SwigDirector_GetPeersInfoHandler(JNIEnv *jenv) : ppc::front::GetPeersInfoHandler(), Swig::Director(jenv) { +} + +SwigDirector_GetPeersInfoHandler::~SwigDirector_GetPeersInfoHandler() { + swig_disconnect_director_self("swigDirectorDisconnect"); +} + + +void SwigDirector_GetPeersInfoHandler::onPeersInfo(bcos::Error::Ptr e,std::string const &peersInfo) { + JNIEnvWrapper swigjnienv(this) ; + JNIEnv * jenv = swigjnienv.getJNIEnv() ; + jobject swigjobj = (jobject) NULL ; + jlong je ; + jstring jpeersInfo = 0 ; + + if (!swig_override[0]) { + SWIG_JavaThrowException(JNIEnvWrapper(this).getJNIEnv(), SWIG_JavaDirectorPureVirtual, "Attempted to invoke pure virtual method ppc::front::GetPeersInfoHandler::onPeersInfo."); + return; + } + swigjobj = swig_get_self(jenv); + if (swigjobj && jenv->IsSameObject(swigjobj, NULL) == JNI_FALSE) { + je = 0; + if (e) { + *((std::shared_ptr< bcos::Error > **)&je) = new std::shared_ptr< bcos::Error >(e); + } + jpeersInfo = jenv->NewStringUTF((&peersInfo)->c_str()); + Swig::LocalRefGuard peersInfo_refguard(jenv, jpeersInfo); + jenv->CallStaticVoidMethod(Swig::jclass_wedpr_java_transportJNI, Swig::director_method_ids[3], swigjobj, je, jpeersInfo); + jthrowable swigerror = jenv->ExceptionOccurred(); + if (swigerror) { + Swig::DirectorException::raise(jenv, swigerror); + } + + } else { + SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "null upcall object in ppc::front::GetPeersInfoHandler::onPeersInfo "); + } + if (swigjobj) jenv->DeleteLocalRef(swigjobj); +} + +void SwigDirector_GetPeersInfoHandler::swig_connect_director(JNIEnv *jenv, jobject jself, jclass jcls, bool swig_mem_own, bool weak_global) { + static jclass baseclass = swig_new_global_ref(jenv, "com/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler"); + if (!baseclass) return; + static SwigDirectorMethod methods[] = { + SwigDirectorMethod(jenv, baseclass, "onPeersInfo", "(Lcom/webank/wedpr/sdk/jni/generated/Error;Ljava/lang/String;)V") + }; + + if (swig_set_self(jenv, jself, swig_mem_own, weak_global)) { + bool derived = (jenv->IsSameObject(baseclass, jcls) ? false : true); + for (int i = 0; i < 1; ++i) { + swig_override[i] = false; + if (derived) { + jmethodID methid = jenv->GetMethodID(jcls, methods[i].name, methods[i].desc); + swig_override[i] = methods[i].methid && (methid != methods[i].methid); + jenv->ExceptionClear(); + } + } + } +} + + #ifdef __cplusplus extern "C" { @@ -1399,6 +1459,30 @@ SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tra } +SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_new_1SharedGetPeersInfoHandler(JNIEnv *jenv, jclass jcls) { + jlong jresult = 0 ; + std::shared_ptr< ppc::front::GetPeersInfoHandler > *result = 0 ; + + (void)jenv; + (void)jcls; + result = (std::shared_ptr< ppc::front::GetPeersInfoHandler > *)new std::shared_ptr< ppc::front::GetPeersInfoHandler >(); + *(std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&jresult = (result && *result) ? new std::shared_ptr< ppc::front::GetPeersInfoHandler >(*result) : 0; + if (1) delete result; + return jresult; +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1SharedGetPeersInfoHandler(JNIEnv *jenv, jclass jcls, jlong jarg1) { + std::shared_ptr< ppc::front::GetPeersInfoHandler > *arg1 = (std::shared_ptr< ppc::front::GetPeersInfoHandler > *) 0 ; + std::shared_ptr< ppc::front::GetPeersInfoHandler > tempnull1 ; + + (void)jenv; + (void)jcls; + arg1 = jarg1 ? *(std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&jarg1 : &tempnull1; + delete arg1; +} + + SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_new_1SharedGateway(JNIEnv *jenv, jclass jcls) { jlong jresult = 0 ; std::shared_ptr< ppc::gateway::IGateway > *result = 0 ; @@ -5912,6 +5996,84 @@ SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tra } +SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_new_1GetPeersInfoHandler(JNIEnv *jenv, jclass jcls) { + jlong jresult = 0 ; + ppc::front::GetPeersInfoHandler *result = 0 ; + + (void)jenv; + (void)jcls; + result = (ppc::front::GetPeersInfoHandler *)new SwigDirector_GetPeersInfoHandler(jenv); + + *(std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&jresult = result ? new std::shared_ptr< ppc::front::GetPeersInfoHandler >(result SWIG_NO_NULL_DELETER_1) : 0; + + return jresult; +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1GetPeersInfoHandler(JNIEnv *jenv, jclass jcls, jlong jarg1) { + ppc::front::GetPeersInfoHandler *arg1 = (ppc::front::GetPeersInfoHandler *) 0 ; + std::shared_ptr< ppc::front::GetPeersInfoHandler > *smartarg1 = 0 ; + + (void)jenv; + (void)jcls; + + smartarg1 = *(std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&jarg1; + arg1 = (ppc::front::GetPeersInfoHandler *)(smartarg1 ? smartarg1->get() : 0); + (void)arg1; delete smartarg1; +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GetPeersInfoHandler_1onPeersInfo(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jlong jarg2, jobject jarg2_, jstring jarg3) { + ppc::front::GetPeersInfoHandler *arg1 = (ppc::front::GetPeersInfoHandler *) 0 ; + bcos::Error::Ptr arg2 ; + std::string *arg3 = 0 ; + std::shared_ptr< ppc::front::GetPeersInfoHandler > *smartarg1 = 0 ; + bcos::Error::Ptr *argp2 ; + + (void)jenv; + (void)jcls; + (void)jarg1_; + (void)jarg2_; + + smartarg1 = *(std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&jarg1; + arg1 = (ppc::front::GetPeersInfoHandler *)(smartarg1 ? smartarg1->get() : 0); + argp2 = *(bcos::Error::Ptr **)&jarg2; + if (argp2) arg2 = *argp2; + if(!jarg3) { + SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "null string"); + return ; + } + const char *arg3_pstr = (const char *)jenv->GetStringUTFChars(jarg3, 0); + if (!arg3_pstr) return ; + std::string arg3_str(arg3_pstr); + arg3 = &arg3_str; + jenv->ReleaseStringUTFChars(jarg3, arg3_pstr); + (arg1)->onPeersInfo(arg2,(std::string const &)*arg3); +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GetPeersInfoHandler_1director_1connect(JNIEnv *jenv, jclass jcls, jobject jself, jlong objarg, jboolean jswig_mem_own, jboolean jweak_global) { + std::shared_ptr< ppc::front::GetPeersInfoHandler > *obj = *((std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&objarg); + (void)jcls; + // Keep a local instance of the smart pointer around while we are using the raw pointer + // Avoids using smart pointer specific API. + SwigDirector_GetPeersInfoHandler *director = static_cast(obj->operator->()); + director->swig_connect_director(jenv, jself, jenv->GetObjectClass(jself), (jswig_mem_own == JNI_TRUE), (jweak_global == JNI_TRUE)); +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GetPeersInfoHandler_1change_1ownership(JNIEnv *jenv, jclass jcls, jobject jself, jlong objarg, jboolean jtake_or_release) { + std::shared_ptr< ppc::front::GetPeersInfoHandler > *obj = *((std::shared_ptr< ppc::front::GetPeersInfoHandler > **)&objarg); + // Keep a local instance of the smart pointer around while we are using the raw pointer + // Avoids using smart pointer specific API. + SwigDirector_GetPeersInfoHandler *director = dynamic_cast(obj->operator->()); + (void)jcls; + if (director) { + director->swig_java_change_ownership(jenv, jself, jtake_or_release ? true : false); + } +} + + SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1IFront(JNIEnv *jenv, jclass jcls, jlong jarg1) { ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; std::shared_ptr< ppc::front::IFront > *smartarg1 = 0 ; @@ -6339,6 +6501,25 @@ SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tr } +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1asyncGetPeers(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jlong jarg2, jobject jarg2_) { + ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; + ppc::front::GetPeersInfoHandler::Ptr arg2 ; + std::shared_ptr< ppc::front::IFront > *smartarg1 = 0 ; + ppc::front::GetPeersInfoHandler::Ptr *argp2 ; + + (void)jenv; + (void)jcls; + (void)jarg1_; + (void)jarg2_; + + smartarg1 = *(std::shared_ptr< ppc::front::IFront > **)&jarg1; + arg1 = (ppc::front::IFront *)(smartarg1 ? smartarg1->get() : 0); + argp2 = *(ppc::front::GetPeersInfoHandler::Ptr **)&jarg2; + if (argp2) arg2 = *argp2; + (arg1)->asyncGetPeers(arg2); +} + + SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1registerNodeInfo(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jlong jarg2) { jlong jresult = 0 ; ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; @@ -6381,6 +6562,24 @@ SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tr } +SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1nodeInfo(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) { + jlong jresult = 0 ; + ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; + std::shared_ptr< ppc::front::IFront > *smartarg1 = 0 ; + ppc::protocol::INodeInfo::Ptr *result = 0 ; + + (void)jenv; + (void)jcls; + (void)jarg1_; + + smartarg1 = *(std::shared_ptr< ppc::front::IFront > **)&jarg1; + arg1 = (ppc::front::IFront *)(smartarg1 ? smartarg1->get() : 0); + result = (ppc::protocol::INodeInfo::Ptr *) &(arg1)->nodeInfo(); + *(ppc::protocol::INodeInfo::Ptr **)&jresult = result; + return jresult; +} + + SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1registerTopic(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jstring jarg2) { jlong jresult = 0 ; ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; @@ -6437,6 +6636,54 @@ SWIGEXPORT jlong JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tr } +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1registerComponent(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jstring jarg2) { + ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; + std::string *arg2 = 0 ; + std::shared_ptr< ppc::front::IFront > *smartarg1 = 0 ; + + (void)jenv; + (void)jcls; + (void)jarg1_; + + smartarg1 = *(std::shared_ptr< ppc::front::IFront > **)&jarg1; + arg1 = (ppc::front::IFront *)(smartarg1 ? smartarg1->get() : 0); + if(!jarg2) { + SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "null string"); + return ; + } + const char *arg2_pstr = (const char *)jenv->GetStringUTFChars(jarg2, 0); + if (!arg2_pstr) return ; + std::string arg2_str(arg2_pstr); + arg2 = &arg2_str; + jenv->ReleaseStringUTFChars(jarg2, arg2_pstr); + (arg1)->registerComponent((std::string const &)*arg2); +} + + +SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_IFront_1unRegisterComponent(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jstring jarg2) { + ppc::front::IFront *arg1 = (ppc::front::IFront *) 0 ; + std::string *arg2 = 0 ; + std::shared_ptr< ppc::front::IFront > *smartarg1 = 0 ; + + (void)jenv; + (void)jcls; + (void)jarg1_; + + smartarg1 = *(std::shared_ptr< ppc::front::IFront > **)&jarg1; + arg1 = (ppc::front::IFront *)(smartarg1 ? smartarg1->get() : 0); + if(!jarg2) { + SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "null string"); + return ; + } + const char *arg2_pstr = (const char *)jenv->GetStringUTFChars(jarg2, 0); + if (!arg2_pstr) return ; + std::string arg2_str(arg2_pstr); + arg2 = &arg2_str; + jenv->ReleaseStringUTFChars(jarg2, arg2_pstr); + (arg1)->unRegisterComponent((std::string const &)*arg2); +} + + SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1IFrontBuilder(JNIEnv *jenv, jclass jcls, jlong jarg1) { ppc::front::IFrontBuilder *arg1 = (ppc::front::IFrontBuilder *) 0 ; @@ -6790,7 +7037,7 @@ SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tra static struct { const char *method; const char *signature; - } methods[3] = { + } methods[4] = { { "SwigDirector_ErrorCallback_onError", "(Lcom/webank/wedpr/sdk/jni/generated/ErrorCallback;J)V" }, @@ -6799,6 +7046,9 @@ SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1tra }, { "SwigDirector_IMessageHandler_onMessage", "(Lcom/webank/wedpr/sdk/jni/generated/IMessageHandler;JJJ)V" + }, + { + "SwigDirector_GetPeersInfoHandler_onPeersInfo", "(Lcom/webank/wedpr/sdk/jni/generated/GetPeersInfoHandler;JLjava/lang/String;)V" } }; Swig::jclass_wedpr_java_transportJNI = (jclass) jenv->NewGlobalRef(jcls); diff --git a/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.h b/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.h index 89b6596b..2a61714c 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.h +++ b/cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.h @@ -54,5 +54,20 @@ class SwigDirector_IMessageHandler : public ppc::front::IMessageHandler, public Swig::BoolArray<1> swig_override; }; +class SwigDirector_GetPeersInfoHandler : public ppc::front::GetPeersInfoHandler, public Swig::Director { + +public: + void swig_connect_director(JNIEnv *jenv, jobject jself, jclass jcls, bool swig_mem_own, bool weak_global); + SwigDirector_GetPeersInfoHandler(JNIEnv *jenv); + virtual ~SwigDirector_GetPeersInfoHandler(); + virtual void onPeersInfo(bcos::Error::Ptr e,std::string const &peersInfo); +public: + bool swig_overrides(int n) { + return (n < 1 ? swig_override[n] : false); + } +protected: + Swig::BoolArray<1> swig_override; +}; + #endif diff --git a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i index 2c656f17..212ffdd9 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i +++ b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i @@ -41,6 +41,7 @@ PRIMITIVE_TYPEMAP(unsigned long int, long long); %shared_ptr(ppc::front::ErrorCallback); %shared_ptr(ppc::front::MessageDispatcherHandler); %shared_ptr(ppc::front::IMessageHandler); +%shared_ptr(ppc::front::GetPeersInfoHandler); %shared_ptr(ppc::gateway::IGateway); %shared_ptr(bcos::Error); @@ -133,6 +134,7 @@ namespace bcos{ %template(SharedErrorCallback) std::shared_ptr; %template(SharedMessageDispatcherHandler) std::shared_ptr; %template(SharedIMessageHandler) std::shared_ptr; +%template(SharedGetPeersInfoHandler) std::shared_ptr; %template(SharedGateway) std::shared_ptr; @@ -153,6 +155,7 @@ namespace bcos{ %feature("director") ppc::front::ErrorCallback; %feature("director") ppc::front::MessageDispatcherHandler; %feature("director") ppc::front::IMessageHandler; +%feature("director") ppc::front::GetPeersInfoHandler; // Note: the field data should equal to the fieldMap of class or the function %include various.i diff --git a/cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp b/cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp index b2a50563..3f401309 100644 --- a/cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp +++ b/cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp @@ -77,7 +77,7 @@ void ProTransportImpl::keepAlive() { try { - m_gateway->registerNodeInfo(m_config->generateNodeInfo()); + m_gateway->registerNodeInfo(m_front->nodeInfo()); } catch (std::exception const& e) {