Skip to content

Commit

Permalink
update java-sdk wrapper (#32)
Browse files Browse the repository at this point in the history
* update java-sdk wrapper

* optimize compile
  • Loading branch information
cyjseagull authored Sep 13, 2024
1 parent f5919af commit 03747f7
Show file tree
Hide file tree
Showing 27 changed files with 501 additions and 90 deletions.
9 changes: 8 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ set(CEM_SOURCE "")
if(BUILD_CEM)
set(CEM_SOURCE "wedpr-computing/ppc-cem")
endif()

set(TRANSPORT_SDK_SOURCE_LIST
wedpr-protocol
wedpr-transport/ppc-front
wedpr-transport/sdk wedpr-transport/sdk-wrapper)

set(ALL_SOURCE_LIST
${SDK_SOURCE_LIST}
wedpr-crypto/ppc-crypto
Expand All @@ -92,7 +98,6 @@ if(BUILD_WEDPR_TOOLKIT)
include(swig)
message(STATUS "Getting SWIG for Windows: ...DONE")
endif()
add_subdirectory(wedpr-transport/sdk-wrapper)
endif()

if(BUILD_ALL)
Expand All @@ -101,6 +106,8 @@ elseif(BUILD_UDF)
add_sources("${UDF_SOURCE_LIST}")
elseif(BUILD_SDK)
add_sources("${SDK_SOURCE_LIST}")
elseif(BUILD_WEDPR_TOOLKIT)
add_sources("${TRANSPORT_SDK_SOURCE_LIST}")
endif()
########### set the sources end ###########

Expand Down
10 changes: 6 additions & 4 deletions cpp/cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ find_package(unofficial-sodium CONFIG REQUIRED)


##### the full-dependencies #####
if(BUILD_ALL)
find_package(TBB REQUIRED)
if(BUILD_ALL OR BUILD_WEDPR_TOOLKIT)
find_package(jsoncpp REQUIRED)

find_package(TBB REQUIRED)
find_package(gRPC REQUIRED)
find_package(${BCOS_BOOSTSSL_TARGET} REQUIRED)
endif()

if(BUILD_ALL)
# tcmalloc
include(ProjectTCMalloc)

find_package(SEAL REQUIRED)
find_package(Kuku REQUIRED)
find_package(gRPC REQUIRED)

# APSI: Note: APSI depends on seal 4.0 and Kuku 2.1
include(ProjectAPSI)
Expand Down
8 changes: 8 additions & 0 deletions cpp/cmake/Options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ macro(configure_project)
set(VISIBILITY_FLAG "")
set(BUILD_ALL OFF)
endif()
if (BUILD_WEDPR_TOOLKIT)
set(VISIBILITY_FLAG "")
set(BUILD_ALL OFF)
endif()
if (BUILD_ALL)
# install all dependencies
list(APPEND VCPKG_MANIFEST_FEATURES "all")
endif()
if (BUILD_WEDPR_TOOLKIT)
# install wedpr dependencies
list(APPEND VCPKG_MANIFEST_FEATURES "toolkit")
endif()
if(ENABLE_SSE)
# enable sse for libhdfs3
list(APPEND VCPKG_MANIFEST_FEATURES "sse")
Expand Down
5 changes: 3 additions & 2 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ set(WEDPR_TRANSPORT_SDK_TARGET wedpr-transport-sdk)
set(WEDPR_PYTHON_TRANSPORT "wedpr_python_transport")
set(WEDPR_PYTHON_TRANSPORT_DIR ${PROJECT_BINARY_DIR}/python/${WEDPR_PYTHON_TRANSPORT})

set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport_jni")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/generated/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT_LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/wedpr-transport/sdk-wrapper/java/bindings/src/main/resources/META-INF/native)
4 changes: 2 additions & 2 deletions cpp/ppc-framework/Helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
* @date 2022-10-20
*/
#pragma once
#include <string>
#include <sstream>
#include <string>
namespace ppc
{
constexpr static int MAX_PORT = 65535;
Expand All @@ -42,7 +42,7 @@ inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
template <typename T>
inline std::string_view printNodeID(T const& nodeID)
{
size_t offset = nodeID.size() >= 8 ? 8 : nodeID.size();
size_t offset = nodeID.size() >= 15 ? 15 : nodeID.size();
return std::string_view((const char*)nodeID.data(), offset);
}
} // namespace ppc
2 changes: 2 additions & 0 deletions cpp/ppc-framework/front/FrontConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class FrontConfig
virtual std::vector<std::string> const& getComponents() const { return m_components; }
void setComponents(std::vector<std::string> const& components) { m_components = components; }

void addComponent(std::string const& component) { m_components.emplace_back(component); }

std::vector<std::string>& mutableComponents() { return m_components; }

protected:
Expand Down
140 changes: 138 additions & 2 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ class IFrontClient
virtual void onReceiveMessage(
ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) = 0;
};

///////// the callback definition for sdk wrapper /////////
class ErrorCallback
{
public:
using Ptr = std::shared_ptr<ErrorCallback>;
ErrorCallback() = default;
virtual ~ErrorCallback() {}

virtual void onError(bcos::Error::Ptr error) = 0;
};

class MessageDispatcherHandler
{
public:
using Ptr = std::shared_ptr<MessageDispatcherHandler>;
MessageDispatcherHandler() = default;
virtual ~MessageDispatcherHandler() {}

virtual void onMessage(ppc::protocol::Message::Ptr msg) = 0;
};

class SendResponseHandler
{
public:
using Ptr = std::shared_ptr<SendResponseHandler>;
SendResponseHandler(ppc::protocol::SendResponseFunction responseFunc)
: m_responseFunc(responseFunc)
{}
virtual ~SendResponseHandler() {}

virtual void sendResponse(std::shared_ptr<bcos::bytes>&& payload)
{
m_responseFunc(std::move(payload));
}

private:
ppc::protocol::SendResponseFunction m_responseFunc;
};

class IMessageHandler
{
public:
using Ptr = std::shared_ptr<IMessageHandler>;
IMessageHandler() = default;
virtual ~IMessageHandler() {}

virtual void onMessage(bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
SendResponseHandler sendResponseHandler) = 0;
};

///////// the callback definition for sdk wrapper /////////

class IFront : virtual public IFrontClient
{
public:
Expand Down Expand Up @@ -71,8 +124,23 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

/////// to simplify SDK wrapper ////
virtual void registerTopicHandler(
std::string const& topic, MessageDispatcherHandler::Ptr callback)
{
registerTopicHandler(topic, populateMessageDispatcherCallback(callback));
}

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;

/////// to simplify SDK wrapper ////
virtual void registerMessageHandler(
std::string const& componentType, MessageDispatcherHandler::Ptr callback)
{
registerMessageHandler(componentType, populateMessageDispatcherCallback(callback));
}

/**
* @brief async send message
*
Expand All @@ -88,19 +156,53 @@ class IFront : virtual public IFrontClient
* @param timeout timeout
* @param callback callback
*/
virtual void asyncSendMessage(ppc::protocol::RouteType routeType,
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageCallback callback) = 0;

/////// to simplify SDK wrapper ////

// !!! Note: the 'payload' type(char*) should not been changed, since it used to pass-in java
// byte[] data
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, char* payload,
uint64_t payloadSize, int seq, long timeout, ErrorCallback::Ptr errorCallback,
IMessageHandler::Ptr msgHandler)
{
// TODO: optimize here
bcos::bytes copyedPayload(payload, payload + payloadSize);
asyncSendMessage(routeType, routeInfo, std::move(copyedPayload), seq, timeout,
populateErrorCallback(errorCallback), populateMsgCallback(msgHandler));
}

virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0;

/////// to simplify SDK wrapper ////

// !!! Note: the 'payload ' type(char*) should not been changed, since it used to pass-in java
// byte[] data
virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ErrorCallback::Ptr errorCallback)
{
asyncSendResponse(
dstNode, traceID, std::move(payload), seq, populateErrorCallback(errorCallback));
}

// the sync interface for async_send_message
virtual bcos::Error::Ptr push(ppc::protocol::RouteType routeType,
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout) = 0;

// TODO: optmize here
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, char* payload,
uint64_t payloadSize, int seq, long timeout)
{
bcos::bytes copyedPayload(payload, payload + payloadSize);
return push(routeType, routeInfo, std::move(copyedPayload), seq, timeout);
}
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

Expand Down Expand Up @@ -131,6 +233,40 @@ class IFront : virtual public IFrontClient
* @param topic the topic to unregister
*/
virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0;


private:
ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback)
{
if (errorCallback == nullptr)
{
return nullptr;
}
return [errorCallback](bcos::Error::Ptr error) { errorCallback->onError(error); };
}

ppc::protocol::MessageDispatcherCallback populateMessageDispatcherCallback(
MessageDispatcherHandler::Ptr handler)
{
if (handler == nullptr)
{
return nullptr;
}
return [handler](ppc::protocol::Message::Ptr msg) { handler->onMessage(msg); };
}

ppc::protocol::MessageCallback populateMsgCallback(IMessageHandler::Ptr msgHandler)
{
if (msgHandler == nullptr)
{
return nullptr;
}
return [msgHandler](bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
ppc::protocol::SendResponseFunction resFunc) {
SendResponseHandler sendResponseHandler(resFunc);
msgHandler->onMessage(e, msg, sendResponseHandler);
};
}
};

class IFrontBuilder
Expand Down
38 changes: 38 additions & 0 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "MessagePayload.h"
#include "RouteType.h"
#include "ppc-framework/Helper.h"
#include "ppc-framework/libwrapper/Buffer.h"
#include <bcos-boostssl/interfaces/MessageFace.h>
#include <bcos-utilities/Common.h>
#include <bcos-utilities/DataConvertUtility.h>
Expand All @@ -46,12 +47,34 @@ class MessageOptionalHeader

// the source nodeID that send the message
virtual bcos::bytes const& srcNode() const { return m_srcNode; }
/// for swig-wrapper(pass the binary data)
OutputBuffer srcNodeBuffer() const
{
// Note: this will be copied to java through jni
return OutputBuffer{(unsigned char*)m_srcNode.data(), m_srcNode.size()};
}

virtual void setSrcNode(bcos::bytes const& srcNode) { m_srcNode = srcNode; }

// !!! Note: the first paramater type should not been changed, for it's used for pass-in java
// byte[] into c bytes
virtual void setSrcNode(char* data, uint64_t length) { m_srcNode.assign(data, data + length); }

// the target nodeID that should receive the message
virtual bcos::bytes const& dstNode() const { return m_dstNode; }

// for swig-wrapper(pass the binary to java)
OutputBuffer dstNodeBuffer() const
{
// Note: this will be copied to java through jni
return OutputBuffer{(unsigned char*)m_dstNode.data(), m_dstNode.size()};
}
virtual void setDstNode(bcos::bytes const& dstNode) { m_dstNode = dstNode; }

// !!! Note: the first paramater type(char*) should not been changed, for it's used for pass-in
// java byte[] into c bytes
virtual void setDstNode(char* data, uint64_t length) { m_dstNode.assign(data, data + length); }

// the target agency that need receive the message
virtual std::string const& dstInst() const { return m_dstInst; }
virtual void setDstInst(std::string const& dstInst) { m_dstInst = dstInst; }
Expand Down Expand Up @@ -184,6 +207,16 @@ class Message : virtual public bcos::boostssl::MessageFace
}

std::shared_ptr<bcos::bytes> payload() const override { return m_payload; }
// for swig wrapper
OutputBuffer payloadBuffer() const
{
if (!m_payload)
{
return OutputBuffer{nullptr, 0};
}
return OutputBuffer{(unsigned char*)m_payload->data(), m_payload->size()};
}

void setPayload(std::shared_ptr<bcos::bytes> _payload) override
{
m_payload = std::move(_payload);
Expand All @@ -196,6 +229,11 @@ class Message : virtual public bcos::boostssl::MessageFace

MessagePayload::Ptr const& frontMessage() const { return m_frontMessage; }

// Note: swig wrapper require define all methods
virtual bool encode(bcos::bytes& _buffer) = 0;
// encode and return the {header, payload}
virtual bool encode(bcos::boostssl::EncodedMsg& _encodedMsg) = 0;
virtual int64_t decode(bcos::bytesConstRef _buffer) = 0;

protected:
MessageHeader::Ptr m_header;
Expand Down
6 changes: 6 additions & 0 deletions cpp/ppc-framework/protocol/MessagePayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @date 2024-08-22
*/
#pragma once
#include "ppc-framework/libwrapper/Buffer.h"
#include <bcos-utilities/Common.h>
#include <memory>

Expand All @@ -42,6 +43,11 @@ class MessagePayload
virtual void setVersion(uint8_t version) { m_version = version; }
// data
virtual bcos::bytes const& data() const { return m_data; }
// for swig wrapper here
virtual OutputBuffer dataBuffer() const
{
return OutputBuffer{(unsigned char*)m_data.data(), m_data.size()};
}
virtual void setData(bcos::bytes&& data) { m_data = std::move(data); }
virtual void setData(bcos::bytes const& data) { m_data = data; }
// the seq
Expand Down
Loading

0 comments on commit 03747f7

Please sign in to comment.