Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update java-sdk wrapper #32

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ set(CEM_SOURCE "")
if(BUILD_CEM)
set(CEM_SOURCE "wedpr-computing/ppc-cem")
endif()

set(TRANSPORT_SDK_SOURCE_LIST
wedpr-protocol
wedpr-transport/ppc-front
wedpr-transport/sdk wedpr-transport/sdk-wrapper)

set(ALL_SOURCE_LIST
${SDK_SOURCE_LIST}
wedpr-crypto/ppc-crypto
Expand All @@ -92,7 +98,6 @@ if(BUILD_WEDPR_TOOLKIT)
include(swig)
message(STATUS "Getting SWIG for Windows: ...DONE")
endif()
add_subdirectory(wedpr-transport/sdk-wrapper)
endif()

if(BUILD_ALL)
Expand All @@ -101,6 +106,8 @@ elseif(BUILD_UDF)
add_sources("${UDF_SOURCE_LIST}")
elseif(BUILD_SDK)
add_sources("${SDK_SOURCE_LIST}")
elseif(BUILD_WEDPR_TOOLKIT)
add_sources("${TRANSPORT_SDK_SOURCE_LIST}")
endif()
########### set the sources end ###########

Expand Down
10 changes: 6 additions & 4 deletions cpp/cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ find_package(unofficial-sodium CONFIG REQUIRED)


##### the full-dependencies #####
if(BUILD_ALL)
find_package(TBB REQUIRED)
if(BUILD_ALL OR BUILD_WEDPR_TOOLKIT)
find_package(jsoncpp REQUIRED)

find_package(TBB REQUIRED)
find_package(gRPC REQUIRED)
find_package(${BCOS_BOOSTSSL_TARGET} REQUIRED)
endif()

if(BUILD_ALL)
# tcmalloc
include(ProjectTCMalloc)

find_package(SEAL REQUIRED)
find_package(Kuku REQUIRED)
find_package(gRPC REQUIRED)

# APSI: Note: APSI depends on seal 4.0 and Kuku 2.1
include(ProjectAPSI)
Expand Down
8 changes: 8 additions & 0 deletions cpp/cmake/Options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ macro(configure_project)
set(VISIBILITY_FLAG "")
set(BUILD_ALL OFF)
endif()
if (BUILD_WEDPR_TOOLKIT)
set(VISIBILITY_FLAG "")
set(BUILD_ALL OFF)
endif()
if (BUILD_ALL)
# install all dependencies
list(APPEND VCPKG_MANIFEST_FEATURES "all")
endif()
if (BUILD_WEDPR_TOOLKIT)
# install wedpr dependencies
list(APPEND VCPKG_MANIFEST_FEATURES "toolkit")
endif()
if(ENABLE_SSE)
# enable sse for libhdfs3
list(APPEND VCPKG_MANIFEST_FEATURES "sse")
Expand Down
5 changes: 3 additions & 2 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ set(WEDPR_TRANSPORT_SDK_TARGET wedpr-transport-sdk)
set(WEDPR_PYTHON_TRANSPORT "wedpr_python_transport")
set(WEDPR_PYTHON_TRANSPORT_DIR ${PROJECT_BINARY_DIR}/python/${WEDPR_PYTHON_TRANSPORT})

set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT "wedpr_java_transport_jni")
set(WEDPR_JAVA_TRANSPORT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/java/generated/${WEDPR_JAVA_TRANSPORT})
set(WEDPR_JAVA_TRANSPORT_LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/wedpr-transport/sdk-wrapper/java/bindings/src/main/resources/META-INF/native)
4 changes: 2 additions & 2 deletions cpp/ppc-framework/Helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
* @date 2022-10-20
*/
#pragma once
#include <string>
#include <sstream>
#include <string>
namespace ppc
{
constexpr static int MAX_PORT = 65535;
Expand All @@ -42,7 +42,7 @@ inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
template <typename T>
inline std::string_view printNodeID(T const& nodeID)
{
size_t offset = nodeID.size() >= 8 ? 8 : nodeID.size();
size_t offset = nodeID.size() >= 15 ? 15 : nodeID.size();
return std::string_view((const char*)nodeID.data(), offset);
}
} // namespace ppc
2 changes: 2 additions & 0 deletions cpp/ppc-framework/front/FrontConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class FrontConfig
virtual std::vector<std::string> const& getComponents() const { return m_components; }
void setComponents(std::vector<std::string> const& components) { m_components = components; }

void addComponent(std::string const& component) { m_components.emplace_back(component); }

std::vector<std::string>& mutableComponents() { return m_components; }

protected:
Expand Down
140 changes: 138 additions & 2 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ class IFrontClient
virtual void onReceiveMessage(
ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) = 0;
};

///////// the callback definition for sdk wrapper /////////
class ErrorCallback
{
public:
using Ptr = std::shared_ptr<ErrorCallback>;
ErrorCallback() = default;
virtual ~ErrorCallback() {}

virtual void onError(bcos::Error::Ptr error) = 0;
};

class MessageDispatcherHandler
{
public:
using Ptr = std::shared_ptr<MessageDispatcherHandler>;
MessageDispatcherHandler() = default;
virtual ~MessageDispatcherHandler() {}

virtual void onMessage(ppc::protocol::Message::Ptr msg) = 0;
};

class SendResponseHandler
{
public:
using Ptr = std::shared_ptr<SendResponseHandler>;
SendResponseHandler(ppc::protocol::SendResponseFunction responseFunc)
: m_responseFunc(responseFunc)
{}
virtual ~SendResponseHandler() {}

virtual void sendResponse(std::shared_ptr<bcos::bytes>&& payload)
{
m_responseFunc(std::move(payload));
}

private:
ppc::protocol::SendResponseFunction m_responseFunc;
};

class IMessageHandler
{
public:
using Ptr = std::shared_ptr<IMessageHandler>;
IMessageHandler() = default;
virtual ~IMessageHandler() {}

virtual void onMessage(bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
SendResponseHandler sendResponseHandler) = 0;
};

///////// the callback definition for sdk wrapper /////////

class IFront : virtual public IFrontClient
{
public:
Expand Down Expand Up @@ -71,8 +124,23 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

/////// to simplify SDK wrapper ////
virtual void registerTopicHandler(
std::string const& topic, MessageDispatcherHandler::Ptr callback)
{
registerTopicHandler(topic, populateMessageDispatcherCallback(callback));
}

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;

/////// to simplify SDK wrapper ////
virtual void registerMessageHandler(
std::string const& componentType, MessageDispatcherHandler::Ptr callback)
{
registerMessageHandler(componentType, populateMessageDispatcherCallback(callback));
}

/**
* @brief async send message
*
Expand All @@ -88,19 +156,53 @@ class IFront : virtual public IFrontClient
* @param timeout timeout
* @param callback callback
*/
virtual void asyncSendMessage(ppc::protocol::RouteType routeType,
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageCallback callback) = 0;

/////// to simplify SDK wrapper ////

// !!! Note: the 'payload' type(char*) should not been changed, since it used to pass-in java
// byte[] data
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, char* payload,
uint64_t payloadSize, int seq, long timeout, ErrorCallback::Ptr errorCallback,
IMessageHandler::Ptr msgHandler)
{
// TODO: optimize here
bcos::bytes copyedPayload(payload, payload + payloadSize);
asyncSendMessage(routeType, routeInfo, std::move(copyedPayload), seq, timeout,
populateErrorCallback(errorCallback), populateMsgCallback(msgHandler));
}

virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0;

/////// to simplify SDK wrapper ////

// !!! Note: the 'payload ' type(char*) should not been changed, since it used to pass-in java
// byte[] data
virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ErrorCallback::Ptr errorCallback)
{
asyncSendResponse(
dstNode, traceID, std::move(payload), seq, populateErrorCallback(errorCallback));
}

// the sync interface for async_send_message
virtual bcos::Error::Ptr push(ppc::protocol::RouteType routeType,
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout) = 0;

// TODO: optmize here
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, char* payload,
uint64_t payloadSize, int seq, long timeout)
{
bcos::bytes copyedPayload(payload, payload + payloadSize);
return push(routeType, routeInfo, std::move(copyedPayload), seq, timeout);
}
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

Expand Down Expand Up @@ -131,6 +233,40 @@ class IFront : virtual public IFrontClient
* @param topic the topic to unregister
*/
virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0;


private:
ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback)
{
if (errorCallback == nullptr)
{
return nullptr;
}
return [errorCallback](bcos::Error::Ptr error) { errorCallback->onError(error); };
}

ppc::protocol::MessageDispatcherCallback populateMessageDispatcherCallback(
MessageDispatcherHandler::Ptr handler)
{
if (handler == nullptr)
{
return nullptr;
}
return [handler](ppc::protocol::Message::Ptr msg) { handler->onMessage(msg); };
}

ppc::protocol::MessageCallback populateMsgCallback(IMessageHandler::Ptr msgHandler)
{
if (msgHandler == nullptr)
{
return nullptr;
}
return [msgHandler](bcos::Error::Ptr e, ppc::protocol::Message::Ptr msg,
ppc::protocol::SendResponseFunction resFunc) {
SendResponseHandler sendResponseHandler(resFunc);
msgHandler->onMessage(e, msg, sendResponseHandler);
};
}
};

class IFrontBuilder
Expand Down
38 changes: 38 additions & 0 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "MessagePayload.h"
#include "RouteType.h"
#include "ppc-framework/Helper.h"
#include "ppc-framework/libwrapper/Buffer.h"
#include <bcos-boostssl/interfaces/MessageFace.h>
#include <bcos-utilities/Common.h>
#include <bcos-utilities/DataConvertUtility.h>
Expand All @@ -46,12 +47,34 @@ class MessageOptionalHeader

// the source nodeID that send the message
virtual bcos::bytes const& srcNode() const { return m_srcNode; }
/// for swig-wrapper(pass the binary data)
OutputBuffer srcNodeBuffer() const
{
// Note: this will be copied to java through jni
return OutputBuffer{(unsigned char*)m_srcNode.data(), m_srcNode.size()};
}

virtual void setSrcNode(bcos::bytes const& srcNode) { m_srcNode = srcNode; }

// !!! Note: the first paramater type should not been changed, for it's used for pass-in java
// byte[] into c bytes
virtual void setSrcNode(char* data, uint64_t length) { m_srcNode.assign(data, data + length); }

// the target nodeID that should receive the message
virtual bcos::bytes const& dstNode() const { return m_dstNode; }

// for swig-wrapper(pass the binary to java)
OutputBuffer dstNodeBuffer() const
{
// Note: this will be copied to java through jni
return OutputBuffer{(unsigned char*)m_dstNode.data(), m_dstNode.size()};
}
virtual void setDstNode(bcos::bytes const& dstNode) { m_dstNode = dstNode; }

// !!! Note: the first paramater type(char*) should not been changed, for it's used for pass-in
// java byte[] into c bytes
virtual void setDstNode(char* data, uint64_t length) { m_dstNode.assign(data, data + length); }

// the target agency that need receive the message
virtual std::string const& dstInst() const { return m_dstInst; }
virtual void setDstInst(std::string const& dstInst) { m_dstInst = dstInst; }
Expand Down Expand Up @@ -184,6 +207,16 @@ class Message : virtual public bcos::boostssl::MessageFace
}

std::shared_ptr<bcos::bytes> payload() const override { return m_payload; }
// for swig wrapper
OutputBuffer payloadBuffer() const
{
if (!m_payload)
{
return OutputBuffer{nullptr, 0};
}
return OutputBuffer{(unsigned char*)m_payload->data(), m_payload->size()};
}

void setPayload(std::shared_ptr<bcos::bytes> _payload) override
{
m_payload = std::move(_payload);
Expand All @@ -196,6 +229,11 @@ class Message : virtual public bcos::boostssl::MessageFace

MessagePayload::Ptr const& frontMessage() const { return m_frontMessage; }

// Note: swig wrapper require define all methods
virtual bool encode(bcos::bytes& _buffer) = 0;
// encode and return the {header, payload}
virtual bool encode(bcos::boostssl::EncodedMsg& _encodedMsg) = 0;
virtual int64_t decode(bcos::bytesConstRef _buffer) = 0;

protected:
MessageHeader::Ptr m_header;
Expand Down
6 changes: 6 additions & 0 deletions cpp/ppc-framework/protocol/MessagePayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @date 2024-08-22
*/
#pragma once
#include "ppc-framework/libwrapper/Buffer.h"
#include <bcos-utilities/Common.h>
#include <memory>

Expand All @@ -42,6 +43,11 @@ class MessagePayload
virtual void setVersion(uint8_t version) { m_version = version; }
// data
virtual bcos::bytes const& data() const { return m_data; }
// for swig wrapper here
virtual OutputBuffer dataBuffer() const
{
return OutputBuffer{(unsigned char*)m_data.data(), m_data.size()};
}
virtual void setData(bcos::bytes&& data) { m_data = std::move(data); }
virtual void setData(bcos::bytes const& data) { m_data = data; }
// the seq
Expand Down
Loading
Loading