From ff282242f8d5adf5e54eef202d8080b5a3e0d3a8 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Fri, 13 Sep 2024 19:22:55 +0800 Subject: [PATCH] fix transport sdk crashed caused by jvm release the c-managed object --- .../wedpr/sdk/jni/demo/TransportDemo.java | 15 +++++-- .../wedpr/sdk/jni/generated/FrontConfig.java | 4 ++ .../wedpr/sdk/jni/generated/Message.java | 4 ++ .../sdk/jni/generated/MessageBuilder.java | 4 ++ .../sdk/jni/generated/MessageHeader.java | 4 ++ .../jni/generated/MessageHeaderBuilder.java | 4 ++ .../jni/generated/MessageOptionalHeader.java | 4 ++ .../MessageOptionalHeaderBuilder.java | 4 ++ .../sdk/jni/generated/MessagePayload.java | 4 ++ .../jni/generated/MessagePayloadBuilder.java | 4 ++ .../wedpr/sdk/jni/generated/Transport.java | 4 ++ .../sdk/jni/transport/IMessageBuilder.java | 11 +++++ .../sdk/jni/transport/TransportConfig.java | 1 + .../sdk/jni/transport/WeDPRTransport.java | 10 ++--- .../transport/handlers/MessageCallback.java | 6 +++ .../handlers/MessageDispatcherCallback.java | 6 +++ .../handlers/MessageErrorCallback.java | 28 +++++++++++++ .../sdk/jni/transport/impl/MessageImpl.java | 3 ++ .../sdk/jni/transport/impl/TransportImpl.java | 40 ++++++++++--------- .../java/swig/wedpr_java_transport.i | 26 ++++++++++++ 20 files changed, 158 insertions(+), 28 deletions(-) create mode 100644 cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageErrorCallback.java diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java index 757d597a..7c26c4e0 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java @@ -21,6 +21,7 @@ import com.webank.wedpr.sdk.jni.transport.TransportConfig; import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback; +import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback; import com.webank.wedpr.sdk.jni.transport.impl.TransportImpl; import com.webank.wedpr.sdk.jni.transport.model.TransportEndPoint; @@ -28,6 +29,10 @@ public class TransportDemo { public static class MessageDispatcherCallbackImpl extends MessageDispatcherCallback { private final String nodeID; + // java -cp 'conf/:lib/*:apps/*' com.webank.wedpr.sdk.jni.demo.TransportDemo agency0Node + // "127.0.0.1" 9020 "ipv4:127.0.0.1:40600,127.0.0.1:40601" "agency1Node" + // java -cp 'conf/:lib/*:apps/*' com.webank.wedpr.sdk.jni.demo.TransportDemo agency1Node + // "127.0.0.1" 9021 "ipv4:127.0.0.1:40620,127.0.0.1:40621" "agency0Node" public MessageDispatcherCallbackImpl(String nodeID) { this.nodeID = nodeID; } @@ -45,10 +50,10 @@ public void onMessage(IMessage message) { } } - public static class MessageErrorCallback extends ErrorCallback { + public static class MessageErrorCallbackImpl extends MessageErrorCallback { private final String nodeID; - public MessageErrorCallback(String nodeID) { + public MessageErrorCallbackImpl(String nodeID) { this.nodeID = nodeID; } @@ -101,7 +106,9 @@ public static void main(String[] args) throws Exception { // send Message to the gateway String topic = "testTopic"; - transport.registerTopicHandler(topic, new MessageDispatcherCallbackImpl(nodeID)); + MessageDispatcherCallback messageDispatcherCallback = + new MessageDispatcherCallbackImpl(nodeID); + transport.registerTopicHandler(topic, messageDispatcherCallback); System.out.println("##### register topic success"); byte[] dstNodeBytes = dstNode.getBytes(); @@ -118,7 +125,7 @@ public static void main(String[] args) throws Exception { payLoad.getBytes(), 0, 10000, - new MessageErrorCallback(nodeID), + new MessageErrorCallbackImpl(nodeID), null); // push diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/FrontConfig.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/FrontConfig.java index c7de68ff..32fb72fc 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/FrontConfig.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/FrontConfig.java @@ -112,4 +112,8 @@ public SWIGTYPE_p_std__vectorT_std__string_t mutableComponents() { return new SWIGTYPE_p_std__vectorT_std__string_t( wedpr_java_transportJNI.FrontConfig_mutableComponents(swigCPtr, this), false); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Message.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Message.java index 68091c7c..32ddfb70 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Message.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Message.java @@ -133,4 +133,8 @@ public long decode(SWIGTYPE_p_bcos__bytesConstRef _buffer) { return wedpr_java_transportJNI.Message_decode( swigCPtr, this, SWIGTYPE_p_bcos__bytesConstRef.getCPtr(_buffer)); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageBuilder.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageBuilder.java index 769d0dae..d2f7ecaf 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageBuilder.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageBuilder.java @@ -67,4 +67,8 @@ public Message build( payload); return (cPtr == 0) ? null : new Message(cPtr, true); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeader.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeader.java index f8c215a4..12dbd933 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeader.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeader.java @@ -150,4 +150,8 @@ public void setRouteType(SWIGTYPE_p_ppc__protocol__RouteType type) { public boolean hasOptionalField() { return wedpr_java_transportJNI.MessageHeader_hasOptionalField(swigCPtr, this); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeaderBuilder.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeaderBuilder.java index 4b3db731..71d876b4 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeaderBuilder.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageHeaderBuilder.java @@ -61,4 +61,8 @@ public MessageOptionalHeader build(MessageOptionalHeader optionalHeader) { optionalHeader); return (cPtr == 0) ? null : new MessageOptionalHeader(cPtr, true); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeader.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeader.java index 8ad6a360..2e09fca4 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeader.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeader.java @@ -126,4 +126,8 @@ public String srcInst() { public void setSrcInst(String srcInst) { wedpr_java_transportJNI.MessageOptionalHeader_setSrcInst(swigCPtr, this, srcInst); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeaderBuilder.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeaderBuilder.java index 16187d11..0078ee10 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeaderBuilder.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessageOptionalHeaderBuilder.java @@ -55,4 +55,8 @@ public MessageOptionalHeader build() { wedpr_java_transportJNI.MessageOptionalHeaderBuilder_build__SWIG_1(swigCPtr, this); return (cPtr == 0) ? null : new MessageOptionalHeader(cPtr, true); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayload.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayload.java index 8c62f8aa..082e6f3d 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayload.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayload.java @@ -106,4 +106,8 @@ public void setRespPacket() { public boolean isRespPacket() { return wedpr_java_transportJNI.MessagePayload_isRespPacket(swigCPtr, this); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayloadBuilder.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayloadBuilder.java index c4d1f674..064a2c95 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayloadBuilder.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/MessagePayloadBuilder.java @@ -51,4 +51,8 @@ public MessagePayload build(SWIGTYPE_p_bcos__bytesConstRef buffer) { swigCPtr, this, SWIGTYPE_p_bcos__bytesConstRef.getCPtr(buffer)); return (cPtr == 0) ? null : new MessagePayload(cPtr, true); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Transport.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Transport.java index 285e1c55..170e3992 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Transport.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/Transport.java @@ -86,4 +86,8 @@ public FrontConfig getConfig() { long cPtr = wedpr_java_transportJNI.Transport_getConfig(swigCPtr, this); return (cPtr == 0) ? null : new FrontConfig(cPtr, true); } + + public void disOwnMemory() { + swigSetCMemOwn(false); + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/IMessageBuilder.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/IMessageBuilder.java index c97bef67..6b61f213 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/IMessageBuilder.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/IMessageBuilder.java @@ -16,10 +16,21 @@ package com.webank.wedpr.sdk.jni.transport; import com.webank.wedpr.sdk.jni.generated.Message; +import com.webank.wedpr.sdk.jni.generated.MessageOptionalHeader; +import com.webank.wedpr.sdk.jni.generated.MessageOptionalHeaderBuilder; import com.webank.wedpr.sdk.jni.transport.impl.MessageImpl; public class IMessageBuilder { public static IMessage build(Message msg) { return new MessageImpl(msg); } + + public static MessageOptionalHeader buildRouteInfo( + MessageOptionalHeaderBuilder routeInfoBuilder, String topic) { + // return the ownership to cpp, since it is created by cpp + MessageOptionalHeader routeInfo = routeInfoBuilder.build(); + routeInfo.setTopic(topic); + routeInfo.disOwnMemory(); + return routeInfo; + } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/TransportConfig.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/TransportConfig.java index e56d78bc..68f8232a 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/TransportConfig.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/TransportConfig.java @@ -53,6 +53,7 @@ public static TransportBuilder getTransportBuilder() { public TransportConfig(Integer threadPoolSize, String nodeID) { this.frontConfig = transportBuilder.buildConfig(threadPoolSize, nodeID); + this.frontConfig.disOwnMemory(); // set default grpcConfig setGrpcConfig(new TransportGrpcConfig()); } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java index f8431244..97146fde 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/WeDPRTransport.java @@ -16,9 +16,9 @@ package com.webank.wedpr.sdk.jni.transport; import com.webank.wedpr.sdk.jni.common.WeDPRSDKException; -import com.webank.wedpr.sdk.jni.generated.ErrorCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback; +import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback; public interface WeDPRTransport { // start the transport @@ -82,7 +82,7 @@ void asyncSendMessageByNodeID( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback); /** @@ -101,7 +101,7 @@ void asyncSendMessageByAgency( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback); void asyncSendMessageByComponent( @@ -111,7 +111,7 @@ void asyncSendMessageByComponent( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback); /** @@ -129,7 +129,7 @@ void asyncSendMessageByTopic( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback); /** @param topic the topic to remove */ diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageCallback.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageCallback.java index 20473f7c..9b2b67d0 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageCallback.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageCallback.java @@ -26,6 +26,12 @@ public abstract class MessageCallback extends IMessageHandler { public abstract void onMessage( Error error, IMessage message, SendResponseHandler sendResponseHandler); + // release the ownership to c++, in case of it's released by the jvm + protected void finalize() { + swigReleaseOwnership(); + delete(); + } + @Override public void onMessage(Error e, Message msg, SendResponseHandler sendResponseHandler) { onMessage(e, IMessageBuilder.build(msg), sendResponseHandler); diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageDispatcherCallback.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageDispatcherCallback.java index 1f02bf5c..05e0b1e2 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageDispatcherCallback.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageDispatcherCallback.java @@ -23,6 +23,12 @@ public abstract class MessageDispatcherCallback extends MessageDispatcherHandler { public abstract void onMessage(IMessage message); + // release the ownership to c++, in case of it's released by the jvm + protected void finalize() { + swigReleaseOwnership(); + delete(); + } + @Override public void onMessage(Message msg) { onMessage(IMessageBuilder.build(msg)); diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageErrorCallback.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageErrorCallback.java new file mode 100644 index 00000000..0d85b06a --- /dev/null +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/handlers/MessageErrorCallback.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.sdk.jni.transport.handlers; + +import com.webank.wedpr.sdk.jni.generated.ErrorCallback; + +public abstract class MessageErrorCallback extends ErrorCallback { + + // release the ownership to c++, in case of it's released by the jvm + @Override + protected void finalize() { + swigReleaseOwnership(); + delete(); + } +} diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/MessageImpl.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/MessageImpl.java index ebbaed1d..8edcc1b6 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/MessageImpl.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/MessageImpl.java @@ -25,6 +25,8 @@ public class MessageHeaderImpl implements IMessageHeader { protected MessageHeaderImpl(MessageHeader messageHeader) { this.messageHeader = messageHeader; + this.messageHeader.optionalField().disOwnMemory(); + this.messageHeader.disOwnMemory(); } @Override @@ -124,6 +126,7 @@ public String toString() { // v1, v2 public MessageImpl(Message rawMessage) { this.rawMessage = rawMessage; + this.rawMessage.disOwnMemory(); this.messageHeader = new MessageHeaderImpl(rawMessage.header()); } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java index 142cf225..149a5453 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/transport/impl/TransportImpl.java @@ -26,12 +26,13 @@ import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; import com.webank.wedpr.sdk.jni.transport.handlers.MessageCallback; import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback; +import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback; import java.math.BigInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TransportImpl implements WeDPRTransport { - private static Logger logger = LoggerFactory.getLogger(TransportImpl.class); + private static final Logger logger = LoggerFactory.getLogger(TransportImpl.class); // the created transport private final Transport transport; private final TransportConfig transportConfig; @@ -46,6 +47,7 @@ public static WeDPRTransport build(TransportConfig transportConfig) { protected TransportImpl(Transport transport, TransportConfig transportConfig) { logger.info("Build Transport, config: {}", transportConfig.toString()); this.transport = transport; + this.transport.disOwnMemory(); this.transportConfig = transportConfig; } @@ -125,11 +127,11 @@ public void asyncSendMessageByNodeID( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback) { - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstNode(dstNode, BigInteger.valueOf(dstNode.length)); - routeInfo.setTopic(topic); this.transport .getFront() .asyncSendMessage( @@ -160,10 +162,10 @@ public void asyncSendMessageByAgency( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback) { - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstInst(agency); this.transport .getFront() @@ -186,11 +188,11 @@ public void asyncSendMessageByComponent( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback) { // set the routeInfo - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstInst(dstInst); routeInfo.setComponentType(component); this.transport @@ -222,11 +224,11 @@ public void asyncSendMessageByTopic( byte[] payload, int seq, int timeout, - ErrorCallback errorCallback, + MessageErrorCallback errorCallback, MessageCallback msgCallback) { // set the routeInfo - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstInst(dstInst); this.transport .getFront() @@ -252,8 +254,8 @@ public void removeTopic(String topic) throws WeDPRSDKException { @Override public void pushByNodeID(String topic, byte[] dstNodeID, int seq, byte[] payload, int timeout) throws WeDPRSDKException { - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstNode(dstNodeID, BigInteger.valueOf(dstNodeID.length)); Error result = this.transport @@ -272,8 +274,8 @@ public void pushByNodeID(String topic, byte[] dstNodeID, int seq, byte[] payload public void pushByComponent( String topic, String dstInst, String component, int seq, byte[] payload, int timeout) throws WeDPRSDKException { - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstInst(dstInst); routeInfo.setComponentType(component); Error result = @@ -292,8 +294,8 @@ public void pushByComponent( @Override public void pushByInst(String topic, String dstInst, int seq, byte[] payload, int timeout) throws WeDPRSDKException { - MessageOptionalHeader routeInfo = this.transport.routeInfoBuilder().build(); - routeInfo.setTopic(topic); + MessageOptionalHeader routeInfo = + IMessageBuilder.buildRouteInfo(this.transport.routeInfoBuilder(), topic); routeInfo.setDstInst(dstInst); Error result = this.transport diff --git a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i index c5cf2638..0829bf22 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i +++ b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i @@ -179,6 +179,32 @@ namespace bcos{ %typemap(javain) OutputBuffer "$javainput" %typemap(javaout) OutputBuffer { return $jnicall; } + + +// refer to: https://stackoverflow.com/questions/12103206/is-it-possible-to-add-text-to-an-existing-typemap-in-swig +%define WRAP(CLASS) +%extend CLASS { +%proxycode %{ + public void disOwnMemory() { + swigSetCMemOwn(false); + } +%} +} +%enddef + +// Note: these object is created from cpp, and maintained with shared_ptr, +// the java code should disOwnMemory in case of released multiple times +WRAP(ppc::front::FrontConfig) +WRAP(ppc::protocol::Message) +WRAP(ppc::protocol::MessageOptionalHeader) +WRAP(ppc::protocol::MessageHeader) +WRAP(ppc::protocol::MessagePayload) +WRAP(ppc::protocol::MessageBuilder) +WRAP(ppc::protocol::MessageHeaderBuilder) +WRAP(ppc::protocol::MessagePayloadBuilder) +WRAP(ppc::protocol::MessageOptionalHeaderBuilder) +WRAP(ppc::sdk::Transport) + /* ///// tests /// %inline {