Skip to content

Commit

Permalink
update java-sdk wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 12, 2024
1 parent f5919af commit ce325a5
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 73 deletions.
5 changes: 3 additions & 2 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ set(WEDPR_TRANSPORT_SDK_TARGET wedpr-transport-sdk)
set(WEDPR_PYTHON_TRANSPORT "wedpr_python_transport")
set(WEDPR_PYTHON_TRANSPORT_DIR ${PROJECT_BINARY_DIR}/python/${WEDPR_PYTHON_TRANSPORT})

set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport_jni")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/generated/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT_LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/wedpr-transport/sdk-wrapper/java/bindings/src/main/resources/META-INF/native)
2 changes: 2 additions & 0 deletions cpp/ppc-framework/front/FrontConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class FrontConfig
virtual std::vector<std::string> const& getComponents() const { return m_components; }
void setComponents(std::vector<std::string> const& components) { m_components = components; }

void addComponent(std::string const& component) { m_components.emplace_back(component); }

std::vector<std::string>& mutableComponents() { return m_components; }

protected:
Expand Down
119 changes: 117 additions & 2 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ class IFrontClient
virtual void onReceiveMessage(
ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) = 0;
};

///////// the callback definition for sdk wrapper /////////
class ErrorCallback
{
public:
using Ptr = std::shared_ptr<ErrorCallback>;
ErrorCallback() = default;
virtual ~ErrorCallback() {}

virtual void onError(bcos::Error::Ptr error) = 0;
};

class MessageDispatcherHandler
{
public:
using Ptr = std::shared_ptr<MessageDispatcherHandler>;
MessageDispatcherHandler() = default;
virtual ~MessageDispatcherHandler() {}

virtual void onMessage(ppc::protocol::Message::Ptr msg) = 0;
};

class SendResponseHandler
{
public:
using Ptr = std::shared_ptr<SendResponseHandler>;
SendResponseHandler(ppc::protocol::SendResponseFunction responseFunc)
: m_responseFunc(responseFunc)
{}
virtual ~SendResponseHandler() {}

virtual void sendResponse(std::shared_ptr<bcos::bytes>&& payload)
{
m_responseFunc(std::move(payload));
}

private:
ppc::protocol::SendResponseFunction m_responseFunc;
};

class IMessageHandler
{
public:
using Ptr = std::shared_ptr<IMessageHandler>;
IMessageHandler() = default;
virtual ~IMessageHandler() {}

virtual void onMessage(bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
SendResponseHandler sendResponseHandler) = 0;
};

///////// the callback definition for sdk wrapper /////////

class IFront : virtual public IFrontClient
{
public:
Expand Down Expand Up @@ -71,8 +124,21 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

virtual void registerTopicHandler(
std::string const& topic, MessageDispatcherHandler::Ptr callback)
{
registerTopicHandler(topic, populateMessageDispatcherCallback(callback));
}

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;

virtual void registerMessageHandler(
std::string const& componentType, MessageDispatcherHandler::Ptr callback)
{
registerMessageHandler(componentType, populateMessageDispatcherCallback(callback));
}

/**
* @brief async send message
*
Expand All @@ -88,16 +154,31 @@ class IFront : virtual public IFrontClient
* @param timeout timeout
* @param callback callback
*/
virtual void asyncSendMessage(ppc::protocol::RouteType routeType,
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageCallback callback) = 0;

virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ErrorCallback::Ptr errorCallback, IMessageHandler::Ptr msgHandler)
{
asyncSendMessage(routeType, routeInfo, std::move(payload), seq, timeout,
populateErrorCallback(errorCallback), populateMsgCallback(msgHandler));
}

virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0;

virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ErrorCallback::Ptr errorCallback)
{
asyncSendResponse(
dstNode, traceID, std::move(payload), seq, populateErrorCallback(errorCallback));
}

// the sync interface for async_send_message
virtual bcos::Error::Ptr push(ppc::protocol::RouteType routeType,
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout) = 0;

Expand Down Expand Up @@ -131,6 +212,40 @@ class IFront : virtual public IFrontClient
* @param topic the topic to unregister
*/
virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0;


private:
ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback)
{
if (errorCallback == nullptr)
{
return nullptr;
}
return [errorCallback](bcos::Error::Ptr error) { errorCallback->onError(error); };
}

ppc::protocol::MessageDispatcherCallback populateMessageDispatcherCallback(
MessageDispatcherHandler::Ptr handler)
{
if (handler == nullptr)
{
return nullptr;
}
return [handler](ppc::protocol::Message::Ptr msg) { handler->onMessage(msg); };
}

ppc::protocol::MessageCallback populateMsgCallback(IMessageHandler::Ptr msgHandler)
{
if (msgHandler == nullptr)
{
return nullptr;
}
return [msgHandler](bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
ppc::protocol::SendResponseFunction resFunc) {
SendResponseHandler sendResponseHandler(resFunc);
msgHandler->onMessage(e, msg, sendResponseHandler);
};
}
};

class IFrontBuilder
Expand Down
5 changes: 5 additions & 0 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ class Message : virtual public bcos::boostssl::MessageFace

MessagePayload::Ptr const& frontMessage() const { return m_frontMessage; }

// Note: swig wrapper require define all methods
virtual bool encode(bcos::bytes& _buffer) = 0;
// encode and return the {header, payload}
virtual bool encode(bcos::boostssl::EncodedMsg& _encodedMsg) = 0;
virtual int64_t decode(bcos::bytesConstRef _buffer) = 0;

protected:
MessageHeader::Ptr m_header;
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
};
}
// ROUTE_THROUGH_TOPIC will hold the topic
m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data),
m_front->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data),
_message->seq(), _timeout, _callback, msgCallback);
}

Expand Down
9 changes: 5 additions & 4 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void FrontImpl::asyncSendResponse(bcos::bytes const& dstNode, std::string const&
* @param timeout timeout
* @param callback callback
*/
void FrontImpl::asyncSendMessage(RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo,
void FrontImpl::asyncSendMessage(uint16_t routeType, MessageOptionalHeader::Ptr const& routeInfo,
bcos::bytes&& payload, int seq, long timeout, ReceiveMsgFunc errorCallback,
MessageCallback callback)
{
Expand All @@ -151,8 +151,9 @@ void FrontImpl::asyncSendMessage(RouteType routeType, MessageOptionalHeader::Ptr
m_callbackManager->addCallback(traceID, timeout, callback);
auto self = weak_from_this();
// send the message to the gateway
asyncSendMessageToGateway(false, std::move(frontMessage), routeType, traceID, routeInfo,
timeout, [self, traceID, routeInfo, errorCallback](bcos::Error::Ptr error) {
asyncSendMessageToGateway(false, std::move(frontMessage), (ppc::protocol::RouteType)routeType,
traceID, routeInfo, timeout,
[self, traceID, routeInfo, errorCallback](bcos::Error::Ptr error) {
auto front = self.lock();
if (!front)
{
Expand Down Expand Up @@ -265,7 +266,7 @@ void FrontImpl::onReceiveMessage(Message::Ptr const& msg, ReceiveMsgFunc callbac
}

// the sync interface for asyncSendMessage
bcos::Error::Ptr FrontImpl::push(RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo,
bcos::Error::Ptr FrontImpl::push(uint16_t routeType, MessageOptionalHeader::Ptr const& routeInfo,
bcos::bytes&& payload, int seq, long timeout)
{
auto promise = std::make_shared<std::promise<bcos::Error::Ptr>>();
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_
*/
void stop() override;

bcos::Error::Ptr push(ppc::protocol::RouteType routeType,
bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout) override;
/**
Expand All @@ -70,7 +70,7 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_
* @param timeout timeout
* @param callback callback
*/
void asyncSendMessage(ppc::protocol::RouteType routeType,
void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageCallback callback) override;
Expand Down
9 changes: 6 additions & 3 deletions cpp/wedpr-transport/sdk-wrapper/java/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
file(GLOB_RECURSE SRCS *.i)
set_source_files_properties(${SRCS} PROPERTIES CPLUSPLUS ON)

set(WEDPR_TRANSPORT_PACKAGE "com.webank.wedpr.sdk.jni.transport")
set(WEDPR_TRANSPORT_PACKAGE "com.webank.wedpr.sdk.jni.generated")

set_property(SOURCE swig/wedpr_java_transport.i PROPERTY COMPILE_OPTIONS
-package ${WEDPR_TRANSPORT_PACKAGE})

file(MAKE_DIRECTORY ${WEDPR_JAVA_TRANSPORT_DIR})
file(MAKE_DIRECTORY ${WEDPR_JAVA_TRANSPORT_LIB_DIR})

swig_add_library(${WEDPR_JAVA_TRANSPORT}
TYPE MODULE
LANGUAGE java
OUTPUT_DIR ${WEDPR_JAVA_TRANSPORT_DIR}
SOURCES ${SRCS})

SET(LIBRARY_OUTPUT_PATH ${WEDPR_JAVA_TRANSPORT_DIR})
message("LIBRARY_OUTPUT_PATH: ${WEDPR_JAVA_TRANSPORT_DIR}")
SET(LIBRARY_OUTPUT_PATH ${WEDPR_JAVA_TRANSPORT_LIB_DIR})
message("LIBRARY_OUTPUT_PATH: ${WEDPR_JAVA_TRANSPORT_LIB_DIR}")

set_target_properties(${WEDPR_JAVA_TRANSPORT} PROPERTIES
SWIG_USE_TARGET_INCLUDE_DIRECTORIES ON
Expand Down
61 changes: 34 additions & 27 deletions cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@ plugins {
id 'idea'
id 'java-library'
id 'maven-publish'
//id 'org.ajoberstar.grgit' version '4.1.1'
//id "de.undercouch.download" version "4.1.2"
// id 'com.github.sherter.google-java-format' version '0.9'
id 'org.ajoberstar.grgit' version '4.1.1'
id "de.undercouch.download" version "4.1.2"
id 'com.github.sherter.google-java-format' version '0.9'
}

// Additional attribute definition
ext {
if (!project.hasProperty("ossrhUsername")) {
ossrhUsername="xxx"
}

if (!project.hasProperty("ossrhPassword")) {
ossrhPassword="xxx"
}
lombokVersion = "1.18.32"
junitVersion = "4.13.2"
log4jVersion = "2.23.0"
jmhVersion = "1.36"
}

println("Notice: current gradle version is " + gradle.gradleVersion)
Expand All @@ -23,15 +38,15 @@ repositories {
mavenLocal()
}

// googleJavaFormat {
// toolVersion = '1.7'
// options style: 'AOSP'
// source = sourceSets*.allJava
// include '**/*.java'
// exclude '**/*Test.java'
// exclude '**/Test*.java'
// exclude '**/Mock*.java'
// }
googleJavaFormat {
toolVersion = '1.7'
options style: 'AOSP'
source = sourceSets*.allJava
include '**/*.java'
exclude '**/*Test.java'
exclude '**/Test*.java'
exclude '**/Mock*.java'
}

sourceSets {
main {
Expand Down Expand Up @@ -61,28 +76,20 @@ sourceSets {
dependencies {
api ('org.slf4j:slf4j-api:1.7.36')

jmhImplementation 'org.openjdk.jmh:jmh-core:1.36'
jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.36'
implementation("org.projectlombok:lombok:${lombokVersion}")
jmhImplementation ("org.openjdk.jmh:jmh-core:${jmhVersion}")
jmhAnnotationProcessor ("org.openjdk.jmh:jmh-generator-annprocess:${jmhVersion}")

testImplementation ("org.slf4j:slf4j-log4j12:${log4jVersion}")
testImplementation ("junit:junit:${junitVersion}")

testImplementation ('org.slf4j:slf4j-log4j12:1.7.36')
testImplementation ('junit:junit:4.13.2')
annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
}

archivesBaseName = 'wedpr-java-transport-jni'
group = 'com.webank.wedpr'
version = '1.0.0'

// Additional attribute definition
ext {
if (!project.hasProperty("ossrhUsername")) {
ossrhUsername="xxx"
}

if (!project.hasProperty("ossrhPassword")) {
ossrhPassword="xxx"
}
}

jar {
exclude '**/*.xml'
exclude '**/*.properties'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public final class JniLibLoader {
// 1. initialize workdir

// -Dcom.webank.wedpr.workdir
String workdir = System.getProperty("com.webank.wedpr.workdir");
String workdir = System.getProperty("com.webank.wedpr.sdk.jni.workdir");
if (workdir != null) {
try {
File f = new File(workdir);
f.mkdirs();
WORKDIR = f;
logger.info("initialize workdir, -Dcom.webank.wedpr.workdir: {}", WORKDIR);
logger.info("initialize workdir, -Dcom.webank.wedpr.sdk.jni.workdir: {}", WORKDIR);
} catch (Exception e) {
logger.debug("initialize workdir, cannot mkdir workdir: {}, e: ", workdir, e);
workdir = null;
Expand Down Expand Up @@ -101,9 +101,9 @@ public static String getLibName(String baseName) {
} else if (osName.contains(MAC)) {
String arch = getArch();
if ("arm".equals(arch)) {
return "lib" + baseName + "-aarch64" + ".dylib";
return "lib" + baseName + "-aarch64" + ".jnilib";
}
return "lib" + baseName + ".dylib";
return "lib" + baseName + ".jnilib";
} else {
throw new RuntimeException("unrecognized OS: " + osName);
}
Expand Down
Loading

0 comments on commit ce325a5

Please sign in to comment.