Skip to content

Commit

Permalink
fix transport sdk crashed caused by jvm release the c-managed object
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 13, 2024
1 parent 4de5c76 commit 126e295
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import com.webank.wedpr.sdk.jni.transport.TransportConfig;
import com.webank.wedpr.sdk.jni.transport.WeDPRTransport;
import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback;
import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback;
import com.webank.wedpr.sdk.jni.transport.impl.TransportImpl;
import com.webank.wedpr.sdk.jni.transport.model.TransportEndPoint;

public class TransportDemo {
public static class MessageDispatcherCallbackImpl extends MessageDispatcherCallback {
private final String nodeID;

// java -cp 'conf/:lib/*:apps/*' com.webank.wedpr.sdk.jni.demo.TransportDemo agency0Node
// "127.0.0.1" 9020 "ipv4:127.0.0.1:40600,127.0.0.1:40601" "agency1Node"
// java -cp 'conf/:lib/*:apps/*' com.webank.wedpr.sdk.jni.demo.TransportDemo agency1Node
// "127.0.0.1" 9021 "ipv4:127.0.0.1:40620,127.0.0.1:40621" "agency0Node"
public MessageDispatcherCallbackImpl(String nodeID) {
this.nodeID = nodeID;
}
Expand All @@ -45,10 +50,10 @@ public void onMessage(IMessage message) {
}
}

public static class MessageErrorCallback extends ErrorCallback {
public static class MessageErrorCallbackImpl extends MessageErrorCallback {
private final String nodeID;

public MessageErrorCallback(String nodeID) {
public MessageErrorCallbackImpl(String nodeID) {
this.nodeID = nodeID;
}

Expand Down Expand Up @@ -101,7 +106,9 @@ public static void main(String[] args) throws Exception {

// send Message to the gateway
String topic = "testTopic";
transport.registerTopicHandler(topic, new MessageDispatcherCallbackImpl(nodeID));
MessageDispatcherCallback messageDispatcherCallback =
new MessageDispatcherCallbackImpl(nodeID);
transport.registerTopicHandler(topic, messageDispatcherCallback);
System.out.println("##### register topic success");

byte[] dstNodeBytes = dstNode.getBytes();
Expand All @@ -118,7 +125,7 @@ public static void main(String[] args) throws Exception {
payLoad.getBytes(),
0,
10000,
new MessageErrorCallback(nodeID),
new MessageErrorCallbackImpl(nodeID),
null);

// push
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,8 @@ public SWIGTYPE_p_std__vectorT_std__string_t mutableComponents() {
return new SWIGTYPE_p_std__vectorT_std__string_t(
wedpr_java_transportJNI.FrontConfig_mutableComponents(swigCPtr, this), false);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ public long decode(SWIGTYPE_p_bcos__bytesConstRef _buffer) {
return wedpr_java_transportJNI.Message_decode(
swigCPtr, this, SWIGTYPE_p_bcos__bytesConstRef.getCPtr(_buffer));
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ public Message build(
payload);
return (cPtr == 0) ? null : new Message(cPtr, true);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,8 @@ public void setRouteType(SWIGTYPE_p_ppc__protocol__RouteType type) {
public boolean hasOptionalField() {
return wedpr_java_transportJNI.MessageHeader_hasOptionalField(swigCPtr, this);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ public MessageOptionalHeader build(MessageOptionalHeader optionalHeader) {
optionalHeader);
return (cPtr == 0) ? null : new MessageOptionalHeader(cPtr, true);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ public String srcInst() {
public void setSrcInst(String srcInst) {
wedpr_java_transportJNI.MessageOptionalHeader_setSrcInst(swigCPtr, this, srcInst);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public MessageOptionalHeader build() {
wedpr_java_transportJNI.MessageOptionalHeaderBuilder_build__SWIG_1(swigCPtr, this);
return (cPtr == 0) ? null : new MessageOptionalHeader(cPtr, true);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,8 @@ public void setRespPacket() {
public boolean isRespPacket() {
return wedpr_java_transportJNI.MessagePayload_isRespPacket(swigCPtr, this);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public MessagePayload build(SWIGTYPE_p_bcos__bytesConstRef buffer) {
swigCPtr, this, SWIGTYPE_p_bcos__bytesConstRef.getCPtr(buffer));
return (cPtr == 0) ? null : new MessagePayload(cPtr, true);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public FrontConfig getConfig() {
long cPtr = wedpr_java_transportJNI.Transport_getConfig(swigCPtr, this);
return (cPtr == 0) ? null : new FrontConfig(cPtr, true);
}

public void disOwnMemory() {
swigSetCMemOwn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@
package com.webank.wedpr.sdk.jni.transport;

import com.webank.wedpr.sdk.jni.generated.Message;
import com.webank.wedpr.sdk.jni.generated.MessageOptionalHeader;
import com.webank.wedpr.sdk.jni.generated.MessageOptionalHeaderBuilder;
import com.webank.wedpr.sdk.jni.transport.impl.MessageImpl;

public class IMessageBuilder {
public static IMessage build(Message msg) {
return new MessageImpl(msg);
}

public static MessageOptionalHeader buildRouteInfo(
MessageOptionalHeaderBuilder routeInfoBuilder, String topic) {
// return the ownership to cpp, since it is created by cpp
MessageOptionalHeader routeInfo = routeInfoBuilder.build();
routeInfo.setTopic(topic);
routeInfo.disOwnMemory();
return routeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static TransportBuilder getTransportBuilder() {

public TransportConfig(Integer threadPoolSize, String nodeID) {
this.frontConfig = transportBuilder.buildConfig(threadPoolSize, nodeID);
this.frontConfig.disOwnMemory();
// set default grpcConfig
setGrpcConfig(new TransportGrpcConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.webank.wedpr.sdk.jni.transport;

import com.webank.wedpr.sdk.jni.common.WeDPRSDKException;
import com.webank.wedpr.sdk.jni.generated.ErrorCallback;
import com.webank.wedpr.sdk.jni.transport.handlers.MessageCallback;
import com.webank.wedpr.sdk.jni.transport.handlers.MessageDispatcherCallback;
import com.webank.wedpr.sdk.jni.transport.handlers.MessageErrorCallback;

public interface WeDPRTransport {
// start the transport
Expand Down Expand Up @@ -82,7 +82,7 @@ void asyncSendMessageByNodeID(
byte[] payload,
int seq,
int timeout,
ErrorCallback errorCallback,
MessageErrorCallback errorCallback,
MessageCallback msgCallback);

/**
Expand All @@ -101,7 +101,7 @@ void asyncSendMessageByAgency(
byte[] payload,
int seq,
int timeout,
ErrorCallback errorCallback,
MessageErrorCallback errorCallback,
MessageCallback msgCallback);

void asyncSendMessageByComponent(
Expand All @@ -111,7 +111,7 @@ void asyncSendMessageByComponent(
byte[] payload,
int seq,
int timeout,
ErrorCallback errorCallback,
MessageErrorCallback errorCallback,
MessageCallback msgCallback);

/**
Expand All @@ -129,7 +129,7 @@ void asyncSendMessageByTopic(
byte[] payload,
int seq,
int timeout,
ErrorCallback errorCallback,
MessageErrorCallback errorCallback,
MessageCallback msgCallback);

/** @param topic the topic to remove */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public abstract class MessageCallback extends IMessageHandler {
public abstract void onMessage(
Error error, IMessage message, SendResponseHandler sendResponseHandler);

// release the ownership to c++, in case of it's released by the jvm
protected void finalize() {
swigReleaseOwnership();
delete();
}

@Override
public void onMessage(Error e, Message msg, SendResponseHandler sendResponseHandler) {
onMessage(e, IMessageBuilder.build(msg), sendResponseHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
public abstract class MessageDispatcherCallback extends MessageDispatcherHandler {
public abstract void onMessage(IMessage message);

// TODO: check this will cause memory leak or not
// release the ownership to c++, in case of it's released by the jvm
protected void finalize() {
swigReleaseOwnership();
delete();
}

@Override
public void onMessage(Message msg) {
onMessage(IMessageBuilder.build(msg));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.sdk.jni.transport.handlers;

import com.webank.wedpr.sdk.jni.generated.ErrorCallback;

public abstract class MessageErrorCallback extends ErrorCallback {

// release the ownership to c++, in case of it's released by the jvm
@Override
protected void finalize() {
swigReleaseOwnership();
delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class MessageHeaderImpl implements IMessageHeader {

protected MessageHeaderImpl(MessageHeader messageHeader) {
this.messageHeader = messageHeader;
this.messageHeader.optionalField().disOwnMemory();
this.messageHeader.disOwnMemory();
}

@Override
Expand Down Expand Up @@ -124,6 +126,7 @@ public String toString() {
// v1, v2
public MessageImpl(Message rawMessage) {
this.rawMessage = rawMessage;
this.rawMessage.disOwnMemory();
this.messageHeader = new MessageHeaderImpl(rawMessage.header());
}

Expand Down
Loading

0 comments on commit 126e295

Please sign in to comment.