From 6f7562d5efc6308d410b6c1300351290e804d4a8 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Fri, 11 Oct 2024 11:58:34 +0800 Subject: [PATCH] send message by component support not specify the dstInst --- .../gateway/router/PeerRouterTable.cpp | 45 +++++++++++++++---- .../gateway/router/PeerRouterTable.h | 3 ++ .../wedpr/sdk/jni/demo/TransportDemo.java | 3 +- .../sdk/jni/transport/impl/TransportImpl.java | 2 +- 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index 9685e82c..3564f832 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -190,19 +190,47 @@ GatewayNodeInfos PeerRouterTable::selectRouterByAgency(Message::Ptr const& msg) return it->second; } +// Note: selectRouterByComponent support not specified the dstInst GatewayNodeInfos PeerRouterTable::selectRouterByComponent(Message::Ptr const& msg) const { GatewayNodeInfos result; - bcos::ReadGuard l(x_mutex); - auto it = m_agency2GatewayInfos.find(msg->header()->optionalField()->dstInst()); - // no router found - if (it == m_agency2GatewayInfos.end()) + auto dstInst = msg->header()->optionalField()->dstInst(); + std::vector selectedRouterInfos; { - return result; + bcos::ReadGuard l(x_mutex); + if (dstInst.size() > 0) + { + // specified the dstInst + auto it = m_agency2GatewayInfos.find(dstInst); + // no router found + if (it == m_agency2GatewayInfos.end()) + { + return result; + } + selectedRouterInfos.emplace_back(it->second); + } + else + { + // the dstInst not specified, query from all agencies + for (auto const& it : m_agency2GatewayInfos) + { + selectedRouterInfos.emplace_back(it.second); + } + } } - auto const& gatewayInfos = it->second; + for (auto const& it : selectedRouterInfos) + { + selectRouterByComponent(result, msg, it); + } + return result; +} + + +void PeerRouterTable::selectRouterByComponent(GatewayNodeInfos& choosedGateway, + Message::Ptr const& msg, GatewayNodeInfos const& singleAgencyGatewayInfos) const +{ // foreach all gateways to find the component - for (auto const& it : gatewayInfos) + for (auto const& it : singleAgencyGatewayInfos) { auto const& nodeListInfo = it->nodeList(); for (auto const& nodeInfo : nodeListInfo) @@ -210,12 +238,11 @@ GatewayNodeInfos PeerRouterTable::selectRouterByComponent(Message::Ptr const& ms if (nodeInfo.second->components().count( msg->header()->optionalField()->componentType())) { - result.insert(it); + choosedGateway.insert(it); break; } } } - return result; } void PeerRouterTable::asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const diff --git a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h index fdb7bc6c..e0a82c80 100644 --- a/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h +++ b/cpp/wedpr-transport/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h @@ -54,6 +54,9 @@ class PeerRouterTable private: virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const; virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const; + void selectRouterByComponent(GatewayNodeInfos& choosedGateway, + ppc::protocol::Message::Ptr const& msg, + GatewayNodeInfos const& singleAgencyGatewayInfos) const; virtual GatewayNodeInfos selectRouterByAgency(ppc::protocol::Message::Ptr const& msg) const; void removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo); void insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo); diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java index 6c1c2c02..6bd5713d 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java @@ -52,7 +52,8 @@ public void onMessage(IMessage message) { + ", payload: " + new String(message.getPayload()) + "#######"); - + // the case access payload multiple times + String payload = new String(message.getPayload()); String response = "#### hello, response!"; System.out.println( "#### sendResponse: " diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java index ce254431..17679b24 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java @@ -303,7 +303,7 @@ public void asyncSendResponse( int seq, MessageErrorCallback errorCallback) { if (dstNode == null) { - throw new WeDPRSDKException("asyncSendMessageByTopic failed for the dstNode is empty"); + throw new WeDPRSDKException("asyncSendResponse failed for the dstNode is empty"); } this.transport .getFront()