Skip to content

Commit

Permalink
add cpp demo for transport
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 13, 2024
1 parent 97726a1 commit 9b504f7
Show file tree
Hide file tree
Showing 19 changed files with 211 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIFactory.h"
#include "ppc-psi/src/cm2020-psi/CM2020PSIFactory.h"
#include "protocol/src/PPCMessage.h"
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#if 0
//TODO: optimize here
#include "ppc-psi/src/ecdh-conn-psi/EcdhConnPSIFactory.h"
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-initializer/Initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "ppc-framework/rpc/RpcInterface.h"
#include "ppc-framework/rpc/RpcTypeDef.h"
#include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIImpl.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include <bcos-boostssl/httpserver/Common.h>
#include <bcos-utilities/Timer.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void CallbackManager::onReceiveMessage(std::string const& topic, Message::Ptr ms
}
if (!callback)
{
FRONT_LOG(DEBUG) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer")
FRONT_LOG(TRACE) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer")
<< LOG_KV("topic", topic);
addMsgCache(topic, msg);
return;
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ dependencies {

archivesBaseName = 'wedpr-java-transport-jni'
group = 'com.webank.wedpr'
version = '1.0.0'
version = '1.0.0-SNAPSHOT'

jar {
exclude '**/*.xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public void onMessage(IMessage message) {
+ nodeID
+ " receiveMessage, detail: "
+ message.toString()
+ ", ");
+ ", payload: "
+ new String(message.getPayload())
+ "#######");
}
}

Expand All @@ -58,63 +60,84 @@ public void onError(Error error) {
+ " MessageErrorCallback, result: "
+ error.errorMessage()
+ ", code:"
+ error.errorCode());
+ error.errorCode()
+ "######");
}
}

public static void main(String[] args) throws Exception {
String nodeID = "testNode";
if (args.length > 1) {
if (args.length > 0) {
nodeID = args[0];
}
TransportConfig transportConfig = new TransportConfig(2, nodeID);
String hostIp = "127.0.0.1";
if (args.length > 2) {
if (args.length > 1) {
hostIp = args[1];
}
int listenPort = 9020;
if (args.length > 3) {
if (args.length > 2) {
listenPort = Integer.valueOf(args[2]);
}
String listenIp = "0.0.0.0";
TransportEndPoint endPoint = new TransportEndPoint(hostIp, listenIp, listenPort);
transportConfig.setSelfEndPoint(endPoint);
String grpcTarget = "ipv4:127.0.0.1:40600,127.0.0.1:40601";
if (args.length > 4) {
if (args.length > 3) {
grpcTarget = args[3];
}
transportConfig.setGatewayTargets(grpcTarget);
String dstNode = "agency2Node";
if (args.length > 4) {
dstNode = args[4];
}
System.out.println("####### transportConfig: " + transportConfig.toString());
System.out.println("####### dstNode: " + dstNode);
// build the gatewayTarget
WeDPRTransport transport = TransportImpl.build(transportConfig);

System.out.println("####### before start the transport");
transport.start();
System.out.println("####### start the transport success");

// send Message to the gateway
String topic = "testTopic";
transport.registerTopicHandler(topic, new MessageDispatcherCallbackImpl(nodeID));
System.out.println("##### register topic success");
String dstNode = "agency2Node";
if (args.length > 5) {
dstNode = args[5];
}

byte[] dstNodeBytes = dstNode.getBytes();
// every 2s send a message
Integer i = 0;
String syncTopic = "sync_" + topic;
while (true) {
String payLoad = "testPayload" + i;
transport.asyncSendMessageByNodeID(
topic,
dstNodeBytes,
payLoad.getBytes(),
0,
10000,
new MessageErrorCallback(nodeID),
null);
i++;
try {
String payLoad = "testPayload" + i;
// send Message by nodeID
transport.asyncSendMessageByNodeID(
topic,
dstNodeBytes,
payLoad.getBytes(),
0,
10000,
new MessageErrorCallback(nodeID),
null);

// push
String syncPayload = "sync_" + payLoad;
transport.pushByNodeID(syncTopic, dstNodeBytes, 0, syncPayload.getBytes(), 2000);
// pop
IMessage msg = transport.pop(syncTopic, 2000);
System.out.println(
"##### receive msg from "
+ syncTopic
+ ", detail: "
+ msg.toString()
+ ", payload: "
+ new String(msg.getPayload())
+ "####");
i++;
} catch (Exception e) {
System.out.println("#### exception: " + e.getMessage());
}
Thread.sleep(2000);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ PRIMITIVE_TYPEMAP(unsigned long int, long long);
#include <vector>
#include <iostream>
#include <stdint.h>
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include "ppc-framework/libwrapper/Buffer.h"
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/protocol/RouteType.h"
Expand Down Expand Up @@ -197,6 +197,5 @@ namespace bcos{

%include "ppc-framework/front/IFront.h"

%include "wedpr-transport/sdk/TransportBuilder.h"
%include "wedpr-transport/sdk/Transport.h"

%include "wedpr-transport/sdk/src/TransportBuilder.h"
%include "wedpr-transport/sdk/src/Transport.h"
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
%{
#define SWIG_FILE_WITH_INIT
#include <stdint.h>
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/protocol/RouteType.h"
#include "ppc-framework/front/FrontConfig.h"
Expand All @@ -44,6 +44,6 @@ namespace ppc::sdk{
%template(SharedRouteInfo) std::shared_ptr<ppc::protocol::MessageOptionalHeader>;
%template(SharedTransport) std::shared_ptr<ppc::sdk::Transport>;

%include "wedpr-transport/sdk/TransportBuilder.h"
%include "wedpr-transport/sdk/Transport.h"
%include "wedpr-transport/sdk/src/TransportBuilder.h"
%include "wedpr-transport/sdk/src/Transport.h"
%include "ppc-framework/front/IFront.h"
16 changes: 8 additions & 8 deletions cpp/wedpr-transport/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(ppc-transport-sdk VERSION ${VERSION})

file(GLOB_RECURSE SRCS *.cpp)

add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS})
target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC
${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB})
cmake_minimum_required(VERSION 3.14)

add_subdirectory(src)

if (DEMO)
add_subdirectory(demo)
enable_testing()
endif ()
7 changes: 7 additions & 0 deletions cpp/wedpr-transport/sdk/demo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# cmake settings
set(BINARY_NAME transport_sdk_demo)
add_executable(${BINARY_NAME} transport_client.cpp)
target_link_libraries(${BINARY_NAME} ${WEDPR_TRANSPORT_SDK_TARGET})

unset(CMAKE_RUNTIME_OUTPUT_DIRECTORY)
132 changes: 132 additions & 0 deletions cpp/wedpr-transport/sdk/demo/transport_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (C) 2022 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file transport_bench.cpp
* @desc: bench for transport
* @author: yujiechen
* @date 2024-09-13
*/
#include "ppc-framework/protocol/RouteType.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include <thread>

using namespace ppc::front;
using namespace ppc::sdk;
using namespace ppc::protocol;

void Usage(std::string const& _appName)
{
std::cout << _appName << " [hostIp] [listenPort] [gatewayTargets] [nodeID] [dstNode]"
<< std::endl;
std::cout << "example:" << std::endl;
std::cout << _appName
<< " 127.0.0.1 9020 ipv4:127.0.0.1:40600,127.0.0.1:40601 agency0Node agency1Node"
<< std::endl;
std::cout << _appName
<< " 127.0.0.1 9021 ipv4:127.0.0.1:40620,127.0.0.1:40621 agency1Node agency0Node"
<< std::endl;
}

int main(int argc, char* argv[])
{
if (argc < 6)
{
Usage(argv[0]);
return -1;
}
// the hostIp
std::string hostIp = argv[1];
// the clientPort
int port = atoi(argv[2]);
std::string listenIp = "0.0.0.0";
// the gatewayTargets
std::string gatewayTargets = argv[3];
// the nodeID
std::string nodeID = argv[4];
// the dstNode
std::string dstNode = argv[5];

auto transportBuilder = std::make_shared<TransportBuilder>();
auto frontConfig = transportBuilder->buildConfig(2, nodeID);
frontConfig->setGatewayGrpcTarget(gatewayTargets);
EndPoint endPoint(hostIp, port);
frontConfig->setSelfEndPoint(endPoint);
auto transport = transportBuilder->buildProTransport(frontConfig);

// start the transport
std::cout << "#### start the front, detail: " << printFrontDesc(frontConfig);
transport->start();
auto routeInfo = transport->routeInfoBuilder()->build();
std::string topic = "sync_testDemo";
routeInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end()));
routeInfo->setTopic(topic);
transport->getFront()->registerTopicHandler(topic, [](Message::Ptr msg) {
std::cout << "=== async receive message: " << printMessage(msg) << "====" << std::endl;
});
std::string syncTopic = "sync__" + topic;
auto syncRouteInfo = transport->routeInfoBuilder()->build();
syncRouteInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end()));
syncRouteInfo->setTopic(syncTopic);
// sendMessage test
long i = 0;
while (true)
{
try
{
std::string payload = "payload+++" + std::to_string(i);
bcos::bytes payloadBytes = bcos::bytes(payload.begin(), payload.end());
// async test
transport->getFront()->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_NODEID,
routeInfo, std::move(bcos::bytes(payload.begin(), payload.end())), 0, 10000,
[](bcos::Error::Ptr error) {
if (error && error->errorCode() != 0)
{
std::cout << "!**** send message failed for: " << error->errorMessage()
<< "***!" << std::endl;
}
},
nullptr);

// push
auto error = transport->getFront()->push((uint16_t)RouteType::ROUTE_THROUGH_NODEID,
syncRouteInfo, std::move(payloadBytes), 0, 10000);
if (!error && error->errorCode() != 0)
{
std::cout << "!**** send message failed for: " << error->errorMessage() << "***!"
<< std::endl;
}
// pop
auto msg = transport->getFront()->pop(syncTopic, 10000);
if (msg == nullptr)
{
std::cout << "try to receive message timeout" << std::endl;
}
else
{
std::cout << "=== sync receive message: " << printMessage(msg)
<< "====" << std::endl;
}
i++;
}
catch (std::exception const& e)
{
std::cout << "!**** exception: " << boost::diagnostic_information(e) << "****!"
<< std::endl;
}
// wait for 2s
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
};
return 0;
}
8 changes: 8 additions & 0 deletions cpp/wedpr-transport/sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(ppc-transport-sdk VERSION ${VERSION})

file(GLOB_RECURSE SRCS *.cpp)

add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS})
target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC
${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB})
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 9b504f7

Please sign in to comment.