Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cpp demo for transport #36

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIFactory.h"
#include "ppc-psi/src/cm2020-psi/CM2020PSIFactory.h"
#include "protocol/src/PPCMessage.h"
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#if 0
//TODO: optimize here
#include "ppc-psi/src/ecdh-conn-psi/EcdhConnPSIFactory.h"
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-initializer/Initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "ppc-framework/rpc/RpcInterface.h"
#include "ppc-framework/rpc/RpcTypeDef.h"
#include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIImpl.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include <bcos-boostssl/httpserver/Common.h>
#include <bcos-utilities/Timer.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void CallbackManager::onReceiveMessage(std::string const& topic, Message::Ptr ms
}
if (!callback)
{
FRONT_LOG(DEBUG) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer")
FRONT_LOG(TRACE) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer")
<< LOG_KV("topic", topic);
addMsgCache(topic, msg);
return;
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ dependencies {

archivesBaseName = 'wedpr-java-transport-jni'
group = 'com.webank.wedpr'
version = '1.0.0'
version = '1.0.0-SNAPSHOT'

jar {
exclude '**/*.xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public void onMessage(IMessage message) {
+ nodeID
+ " receiveMessage, detail: "
+ message.toString()
+ ", ");
+ ", payload: "
+ new String(message.getPayload())
+ "#######");
}
}

Expand All @@ -58,63 +60,84 @@ public void onError(Error error) {
+ " MessageErrorCallback, result: "
+ error.errorMessage()
+ ", code:"
+ error.errorCode());
+ error.errorCode()
+ "######");
}
}

public static void main(String[] args) throws Exception {
String nodeID = "testNode";
if (args.length > 1) {
if (args.length > 0) {
nodeID = args[0];
}
TransportConfig transportConfig = new TransportConfig(2, nodeID);
String hostIp = "127.0.0.1";
if (args.length > 2) {
if (args.length > 1) {
hostIp = args[1];
}
int listenPort = 9020;
if (args.length > 3) {
if (args.length > 2) {
listenPort = Integer.valueOf(args[2]);
}
String listenIp = "0.0.0.0";
TransportEndPoint endPoint = new TransportEndPoint(hostIp, listenIp, listenPort);
transportConfig.setSelfEndPoint(endPoint);
String grpcTarget = "ipv4:127.0.0.1:40600,127.0.0.1:40601";
if (args.length > 4) {
if (args.length > 3) {
grpcTarget = args[3];
}
transportConfig.setGatewayTargets(grpcTarget);
String dstNode = "agency2Node";
if (args.length > 4) {
dstNode = args[4];
}
System.out.println("####### transportConfig: " + transportConfig.toString());
System.out.println("####### dstNode: " + dstNode);
// build the gatewayTarget
WeDPRTransport transport = TransportImpl.build(transportConfig);

System.out.println("####### before start the transport");
transport.start();
System.out.println("####### start the transport success");

// send Message to the gateway
String topic = "testTopic";
transport.registerTopicHandler(topic, new MessageDispatcherCallbackImpl(nodeID));
System.out.println("##### register topic success");
String dstNode = "agency2Node";
if (args.length > 5) {
dstNode = args[5];
}

byte[] dstNodeBytes = dstNode.getBytes();
// every 2s send a message
Integer i = 0;
String syncTopic = "sync_" + topic;
while (true) {
String payLoad = "testPayload" + i;
transport.asyncSendMessageByNodeID(
topic,
dstNodeBytes,
payLoad.getBytes(),
0,
10000,
new MessageErrorCallback(nodeID),
null);
i++;
try {
String payLoad = "testPayload" + i;
// send Message by nodeID
transport.asyncSendMessageByNodeID(
topic,
dstNodeBytes,
payLoad.getBytes(),
0,
10000,
new MessageErrorCallback(nodeID),
null);

// push
String syncPayload = "sync_" + payLoad;
transport.pushByNodeID(syncTopic, dstNodeBytes, 0, syncPayload.getBytes(), 2000);
// pop
IMessage msg = transport.pop(syncTopic, 2000);
System.out.println(
"##### receive msg from "
+ syncTopic
+ ", detail: "
+ msg.toString()
+ ", payload: "
+ new String(msg.getPayload())
+ "####");
i++;
} catch (Exception e) {
System.out.println("#### exception: " + e.getMessage());
}
Thread.sleep(2000);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ PRIMITIVE_TYPEMAP(unsigned long int, long long);
#include <vector>
#include <iostream>
#include <stdint.h>
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include "ppc-framework/libwrapper/Buffer.h"
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/protocol/RouteType.h"
Expand Down Expand Up @@ -197,6 +197,5 @@ namespace bcos{

%include "ppc-framework/front/IFront.h"

%include "wedpr-transport/sdk/TransportBuilder.h"
%include "wedpr-transport/sdk/Transport.h"

%include "wedpr-transport/sdk/src/TransportBuilder.h"
%include "wedpr-transport/sdk/src/Transport.h"
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
%{
#define SWIG_FILE_WITH_INIT
#include <stdint.h>
#include "wedpr-transport/sdk/TransportBuilder.h"
#include "wedpr-transport/sdk/Transport.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include "wedpr-transport/sdk/src/Transport.h"
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/protocol/RouteType.h"
#include "ppc-framework/front/FrontConfig.h"
Expand All @@ -44,6 +44,6 @@ namespace ppc::sdk{
%template(SharedRouteInfo) std::shared_ptr<ppc::protocol::MessageOptionalHeader>;
%template(SharedTransport) std::shared_ptr<ppc::sdk::Transport>;

%include "wedpr-transport/sdk/TransportBuilder.h"
%include "wedpr-transport/sdk/Transport.h"
%include "wedpr-transport/sdk/src/TransportBuilder.h"
%include "wedpr-transport/sdk/src/Transport.h"
%include "ppc-framework/front/IFront.h"
16 changes: 8 additions & 8 deletions cpp/wedpr-transport/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(ppc-transport-sdk VERSION ${VERSION})
file(GLOB_RECURSE SRCS *.cpp)
add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS})
target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC
${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB})
cmake_minimum_required(VERSION 3.14)

add_subdirectory(src)

if (DEMO)
add_subdirectory(demo)
enable_testing()
endif ()
7 changes: 7 additions & 0 deletions cpp/wedpr-transport/sdk/demo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# cmake settings
set(BINARY_NAME transport_sdk_demo)
add_executable(${BINARY_NAME} transport_client.cpp)
target_link_libraries(${BINARY_NAME} ${WEDPR_TRANSPORT_SDK_TARGET})

unset(CMAKE_RUNTIME_OUTPUT_DIRECTORY)
132 changes: 132 additions & 0 deletions cpp/wedpr-transport/sdk/demo/transport_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (C) 2022 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file transport_bench.cpp
* @desc: bench for transport
* @author: yujiechen
* @date 2024-09-13
*/
#include "ppc-framework/protocol/RouteType.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"
#include <thread>

using namespace ppc::front;
using namespace ppc::sdk;
using namespace ppc::protocol;

void Usage(std::string const& _appName)
{
std::cout << _appName << " [hostIp] [listenPort] [gatewayTargets] [nodeID] [dstNode]"
<< std::endl;
std::cout << "example:" << std::endl;
std::cout << _appName
<< " 127.0.0.1 9020 ipv4:127.0.0.1:40600,127.0.0.1:40601 agency0Node agency1Node"
<< std::endl;
std::cout << _appName
<< " 127.0.0.1 9021 ipv4:127.0.0.1:40620,127.0.0.1:40621 agency1Node agency0Node"
<< std::endl;
}

int main(int argc, char* argv[])
{
if (argc < 6)
{
Usage(argv[0]);
return -1;
}
// the hostIp
std::string hostIp = argv[1];
// the clientPort
int port = atoi(argv[2]);
std::string listenIp = "0.0.0.0";
// the gatewayTargets
std::string gatewayTargets = argv[3];
// the nodeID
std::string nodeID = argv[4];
// the dstNode
std::string dstNode = argv[5];

auto transportBuilder = std::make_shared<TransportBuilder>();
auto frontConfig = transportBuilder->buildConfig(2, nodeID);
frontConfig->setGatewayGrpcTarget(gatewayTargets);
EndPoint endPoint(hostIp, port);
frontConfig->setSelfEndPoint(endPoint);
auto transport = transportBuilder->buildProTransport(frontConfig);

// start the transport
std::cout << "#### start the front, detail: " << printFrontDesc(frontConfig);
transport->start();
auto routeInfo = transport->routeInfoBuilder()->build();
std::string topic = "sync_testDemo";
routeInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end()));
routeInfo->setTopic(topic);
transport->getFront()->registerTopicHandler(topic, [](Message::Ptr msg) {
std::cout << "=== async receive message: " << printMessage(msg) << "====" << std::endl;
});
std::string syncTopic = "sync__" + topic;
auto syncRouteInfo = transport->routeInfoBuilder()->build();
syncRouteInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end()));
syncRouteInfo->setTopic(syncTopic);
// sendMessage test
long i = 0;
while (true)
{
try
{
std::string payload = "payload+++" + std::to_string(i);
bcos::bytes payloadBytes = bcos::bytes(payload.begin(), payload.end());
// async test
transport->getFront()->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_NODEID,
routeInfo, std::move(bcos::bytes(payload.begin(), payload.end())), 0, 10000,
[](bcos::Error::Ptr error) {
if (error && error->errorCode() != 0)
{
std::cout << "!**** send message failed for: " << error->errorMessage()
<< "***!" << std::endl;
}
},
nullptr);

// push
auto error = transport->getFront()->push((uint16_t)RouteType::ROUTE_THROUGH_NODEID,
syncRouteInfo, std::move(payloadBytes), 0, 10000);
if (!error && error->errorCode() != 0)
{
std::cout << "!**** send message failed for: " << error->errorMessage() << "***!"
<< std::endl;
}
// pop
auto msg = transport->getFront()->pop(syncTopic, 10000);
if (msg == nullptr)
{
std::cout << "try to receive message timeout" << std::endl;
}
else
{
std::cout << "=== sync receive message: " << printMessage(msg)
<< "====" << std::endl;
}
i++;
}
catch (std::exception const& e)
{
std::cout << "!**** exception: " << boost::diagnostic_information(e) << "****!"
<< std::endl;
}
// wait for 2s
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
};
return 0;
}
8 changes: 8 additions & 0 deletions cpp/wedpr-transport/sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(ppc-transport-sdk VERSION ${VERSION})

file(GLOB_RECURSE SRCS *.cpp)

add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS})
target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC
${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB})
File renamed without changes.
Loading