diff --git a/cpp/wedpr-initializer/Initializer.cpp b/cpp/wedpr-initializer/Initializer.cpp index 26b120d9..d1e1acec 100644 --- a/cpp/wedpr-initializer/Initializer.cpp +++ b/cpp/wedpr-initializer/Initializer.cpp @@ -27,7 +27,7 @@ #include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIFactory.h" #include "ppc-psi/src/cm2020-psi/CM2020PSIFactory.h" #include "protocol/src/PPCMessage.h" -#include "wedpr-transport/sdk/TransportBuilder.h" +#include "wedpr-transport/sdk/src/TransportBuilder.h" #if 0 //TODO: optimize here #include "ppc-psi/src/ecdh-conn-psi/EcdhConnPSIFactory.h" diff --git a/cpp/wedpr-initializer/Initializer.h b/cpp/wedpr-initializer/Initializer.h index 4a49702b..348d1040 100644 --- a/cpp/wedpr-initializer/Initializer.h +++ b/cpp/wedpr-initializer/Initializer.h @@ -24,7 +24,7 @@ #include "ppc-framework/rpc/RpcInterface.h" #include "ppc-framework/rpc/RpcTypeDef.h" #include "ppc-psi/src/bs-ecdh-psi/BsEcdhPSIImpl.h" -#include "wedpr-transport/sdk/Transport.h" +#include "wedpr-transport/sdk/src/Transport.h" #include #include diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp index 3f500292..efddec1c 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp @@ -167,7 +167,7 @@ void CallbackManager::onReceiveMessage(std::string const& topic, Message::Ptr ms } if (!callback) { - FRONT_LOG(DEBUG) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer") + FRONT_LOG(TRACE) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer") << LOG_KV("topic", topic); addMsgCache(topic, msg); return; diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle b/cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle index bbceb950..42409146 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/build.gradle @@ -88,7 +88,7 @@ dependencies { archivesBaseName = 'wedpr-java-transport-jni' group = 'com.webank.wedpr' -version = '1.0.0' +version = '1.0.0-SNAPSHOT' jar { exclude '**/*.xml' diff --git a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java index b753e076..757d597a 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java +++ b/cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/demo/TransportDemo.java @@ -39,7 +39,9 @@ public void onMessage(IMessage message) { + nodeID + " receiveMessage, detail: " + message.toString() - + ", "); + + ", payload: " + + new String(message.getPayload()) + + "#######"); } } @@ -58,37 +60,42 @@ public void onError(Error error) { + " MessageErrorCallback, result: " + error.errorMessage() + ", code:" - + error.errorCode()); + + error.errorCode() + + "######"); } } public static void main(String[] args) throws Exception { String nodeID = "testNode"; - if (args.length > 1) { + if (args.length > 0) { nodeID = args[0]; } TransportConfig transportConfig = new TransportConfig(2, nodeID); String hostIp = "127.0.0.1"; - if (args.length > 2) { + if (args.length > 1) { hostIp = args[1]; } int listenPort = 9020; - if (args.length > 3) { + if (args.length > 2) { listenPort = Integer.valueOf(args[2]); } String listenIp = "0.0.0.0"; TransportEndPoint endPoint = new TransportEndPoint(hostIp, listenIp, listenPort); transportConfig.setSelfEndPoint(endPoint); String grpcTarget = "ipv4:127.0.0.1:40600,127.0.0.1:40601"; - if (args.length > 4) { + if (args.length > 3) { grpcTarget = args[3]; } transportConfig.setGatewayTargets(grpcTarget); + String dstNode = "agency2Node"; + if (args.length > 4) { + dstNode = args[4]; + } System.out.println("####### transportConfig: " + transportConfig.toString()); + System.out.println("####### dstNode: " + dstNode); // build the gatewayTarget WeDPRTransport transport = TransportImpl.build(transportConfig); - System.out.println("####### before start the transport"); transport.start(); System.out.println("####### start the transport success"); @@ -96,25 +103,41 @@ public static void main(String[] args) throws Exception { String topic = "testTopic"; transport.registerTopicHandler(topic, new MessageDispatcherCallbackImpl(nodeID)); System.out.println("##### register topic success"); - String dstNode = "agency2Node"; - if (args.length > 5) { - dstNode = args[5]; - } byte[] dstNodeBytes = dstNode.getBytes(); // every 2s send a message Integer i = 0; + String syncTopic = "sync_" + topic; while (true) { - String payLoad = "testPayload" + i; - transport.asyncSendMessageByNodeID( - topic, - dstNodeBytes, - payLoad.getBytes(), - 0, - 10000, - new MessageErrorCallback(nodeID), - null); - i++; + try { + String payLoad = "testPayload" + i; + // send Message by nodeID + transport.asyncSendMessageByNodeID( + topic, + dstNodeBytes, + payLoad.getBytes(), + 0, + 10000, + new MessageErrorCallback(nodeID), + null); + + // push + String syncPayload = "sync_" + payLoad; + transport.pushByNodeID(syncTopic, dstNodeBytes, 0, syncPayload.getBytes(), 2000); + // pop + IMessage msg = transport.pop(syncTopic, 2000); + System.out.println( + "##### receive msg from " + + syncTopic + + ", detail: " + + msg.toString() + + ", payload: " + + new String(msg.getPayload()) + + "####"); + i++; + } catch (Exception e) { + System.out.println("#### exception: " + e.getMessage()); + } Thread.sleep(2000); } } diff --git a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i index 35dfdd9b..c5cf2638 100644 --- a/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i +++ b/cpp/wedpr-transport/sdk-wrapper/java/swig/wedpr_java_transport.i @@ -62,8 +62,8 @@ PRIMITIVE_TYPEMAP(unsigned long int, long long); #include #include #include -#include "wedpr-transport/sdk/TransportBuilder.h" -#include "wedpr-transport/sdk/Transport.h" +#include "wedpr-transport/sdk/src/TransportBuilder.h" +#include "wedpr-transport/sdk/src/Transport.h" #include "ppc-framework/libwrapper/Buffer.h" #include "ppc-framework/front/IFront.h" #include "ppc-framework/protocol/RouteType.h" @@ -197,6 +197,5 @@ namespace bcos{ %include "ppc-framework/front/IFront.h" -%include "wedpr-transport/sdk/TransportBuilder.h" -%include "wedpr-transport/sdk/Transport.h" - +%include "wedpr-transport/sdk/src/TransportBuilder.h" +%include "wedpr-transport/sdk/src/Transport.h" \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk-wrapper/python/swig/wedpr_python_transport.i b/cpp/wedpr-transport/sdk-wrapper/python/swig/wedpr_python_transport.i index ef073d01..16515cba 100644 --- a/cpp/wedpr-transport/sdk-wrapper/python/swig/wedpr_python_transport.i +++ b/cpp/wedpr-transport/sdk-wrapper/python/swig/wedpr_python_transport.i @@ -16,8 +16,8 @@ %{ #define SWIG_FILE_WITH_INIT #include -#include "wedpr-transport/sdk/TransportBuilder.h" -#include "wedpr-transport/sdk/Transport.h" +#include "wedpr-transport/sdk/src/TransportBuilder.h" +#include "wedpr-transport/sdk/src/Transport.h" #include "ppc-framework/front/IFront.h" #include "ppc-framework/protocol/RouteType.h" #include "ppc-framework/front/FrontConfig.h" @@ -44,6 +44,6 @@ namespace ppc::sdk{ %template(SharedRouteInfo) std::shared_ptr; %template(SharedTransport) std::shared_ptr; -%include "wedpr-transport/sdk/TransportBuilder.h" -%include "wedpr-transport/sdk/Transport.h" +%include "wedpr-transport/sdk/src/TransportBuilder.h" +%include "wedpr-transport/sdk/src/Transport.h" %include "ppc-framework/front/IFront.h" \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/CMakeLists.txt b/cpp/wedpr-transport/sdk/CMakeLists.txt index 1231ca68..4606f2dd 100644 --- a/cpp/wedpr-transport/sdk/CMakeLists.txt +++ b/cpp/wedpr-transport/sdk/CMakeLists.txt @@ -1,8 +1,8 @@ -cmake_minimum_required(VERSION 3.14) -project(ppc-transport-sdk VERSION ${VERSION}) - -file(GLOB_RECURSE SRCS *.cpp) - -add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS}) -target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC - ${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB}) \ No newline at end of file +cmake_minimum_required(VERSION 3.14) + +add_subdirectory(src) + +if (DEMO) + add_subdirectory(demo) + enable_testing() +endif () \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/demo/CMakeLists.txt b/cpp/wedpr-transport/sdk/demo/CMakeLists.txt new file mode 100644 index 00000000..4ee3354c --- /dev/null +++ b/cpp/wedpr-transport/sdk/demo/CMakeLists.txt @@ -0,0 +1,7 @@ +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) +# cmake settings +set(BINARY_NAME transport_sdk_demo) +add_executable(${BINARY_NAME} transport_client.cpp) +target_link_libraries(${BINARY_NAME} ${WEDPR_TRANSPORT_SDK_TARGET}) + +unset(CMAKE_RUNTIME_OUTPUT_DIRECTORY) \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/demo/transport_client.cpp b/cpp/wedpr-transport/sdk/demo/transport_client.cpp new file mode 100644 index 00000000..b9e59d30 --- /dev/null +++ b/cpp/wedpr-transport/sdk/demo/transport_client.cpp @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2022 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file transport_bench.cpp + * @desc: bench for transport + * @author: yujiechen + * @date 2024-09-13 + */ +#include "ppc-framework/protocol/RouteType.h" +#include "wedpr-transport/sdk/src/TransportBuilder.h" +#include + +using namespace ppc::front; +using namespace ppc::sdk; +using namespace ppc::protocol; + +void Usage(std::string const& _appName) +{ + std::cout << _appName << " [hostIp] [listenPort] [gatewayTargets] [nodeID] [dstNode]" + << std::endl; + std::cout << "example:" << std::endl; + std::cout << _appName + << " 127.0.0.1 9020 ipv4:127.0.0.1:40600,127.0.0.1:40601 agency0Node agency1Node" + << std::endl; + std::cout << _appName + << " 127.0.0.1 9021 ipv4:127.0.0.1:40620,127.0.0.1:40621 agency1Node agency0Node" + << std::endl; +} + +int main(int argc, char* argv[]) +{ + if (argc < 6) + { + Usage(argv[0]); + return -1; + } + // the hostIp + std::string hostIp = argv[1]; + // the clientPort + int port = atoi(argv[2]); + std::string listenIp = "0.0.0.0"; + // the gatewayTargets + std::string gatewayTargets = argv[3]; + // the nodeID + std::string nodeID = argv[4]; + // the dstNode + std::string dstNode = argv[5]; + + auto transportBuilder = std::make_shared(); + auto frontConfig = transportBuilder->buildConfig(2, nodeID); + frontConfig->setGatewayGrpcTarget(gatewayTargets); + EndPoint endPoint(hostIp, port); + frontConfig->setSelfEndPoint(endPoint); + auto transport = transportBuilder->buildProTransport(frontConfig); + + // start the transport + std::cout << "#### start the front, detail: " << printFrontDesc(frontConfig); + transport->start(); + auto routeInfo = transport->routeInfoBuilder()->build(); + std::string topic = "sync_testDemo"; + routeInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end())); + routeInfo->setTopic(topic); + transport->getFront()->registerTopicHandler(topic, [](Message::Ptr msg) { + std::cout << "=== async receive message: " << printMessage(msg) << "====" << std::endl; + }); + std::string syncTopic = "sync__" + topic; + auto syncRouteInfo = transport->routeInfoBuilder()->build(); + syncRouteInfo->setDstNode(bcos::bytes(dstNode.begin(), dstNode.end())); + syncRouteInfo->setTopic(syncTopic); + // sendMessage test + long i = 0; + while (true) + { + try + { + std::string payload = "payload+++" + std::to_string(i); + bcos::bytes payloadBytes = bcos::bytes(payload.begin(), payload.end()); + // async test + transport->getFront()->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_NODEID, + routeInfo, std::move(bcos::bytes(payload.begin(), payload.end())), 0, 10000, + [](bcos::Error::Ptr error) { + if (error && error->errorCode() != 0) + { + std::cout << "!**** send message failed for: " << error->errorMessage() + << "***!" << std::endl; + } + }, + nullptr); + + // push + auto error = transport->getFront()->push((uint16_t)RouteType::ROUTE_THROUGH_NODEID, + syncRouteInfo, std::move(payloadBytes), 0, 10000); + if (!error && error->errorCode() != 0) + { + std::cout << "!**** send message failed for: " << error->errorMessage() << "***!" + << std::endl; + } + // pop + auto msg = transport->getFront()->pop(syncTopic, 10000); + if (msg == nullptr) + { + std::cout << "try to receive message timeout" << std::endl; + } + else + { + std::cout << "=== sync receive message: " << printMessage(msg) + << "====" << std::endl; + } + i++; + } + catch (std::exception const& e) + { + std::cout << "!**** exception: " << boost::diagnostic_information(e) << "****!" + << std::endl; + } + // wait for 2s + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + }; + return 0; +} diff --git a/cpp/wedpr-transport/sdk/src/CMakeLists.txt b/cpp/wedpr-transport/sdk/src/CMakeLists.txt new file mode 100644 index 00000000..1231ca68 --- /dev/null +++ b/cpp/wedpr-transport/sdk/src/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.14) +project(ppc-transport-sdk VERSION ${VERSION}) + +file(GLOB_RECURSE SRCS *.cpp) + +add_library(${WEDPR_TRANSPORT_SDK_TARGET} ${SRCS}) +target_link_libraries(${WEDPR_TRANSPORT_SDK_TARGET} PUBLIC + ${FRONT_TARGET} ${PB_PROTOCOL_TARGET} ${SERVICE_CLIENT_TARGET} ${SERVICE_SERVER_TARGET} ${CPU_FEATURES_LIB}) \ No newline at end of file diff --git a/cpp/wedpr-transport/sdk/Common.h b/cpp/wedpr-transport/sdk/src/Common.h similarity index 100% rename from cpp/wedpr-transport/sdk/Common.h rename to cpp/wedpr-transport/sdk/src/Common.h diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.cpp b/cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp similarity index 100% rename from cpp/wedpr-transport/sdk/ProTransportImpl.cpp rename to cpp/wedpr-transport/sdk/src/ProTransportImpl.cpp diff --git a/cpp/wedpr-transport/sdk/ProTransportImpl.h b/cpp/wedpr-transport/sdk/src/ProTransportImpl.h similarity index 100% rename from cpp/wedpr-transport/sdk/ProTransportImpl.h rename to cpp/wedpr-transport/sdk/src/ProTransportImpl.h diff --git a/cpp/wedpr-transport/sdk/Transport.cpp b/cpp/wedpr-transport/sdk/src/Transport.cpp similarity index 100% rename from cpp/wedpr-transport/sdk/Transport.cpp rename to cpp/wedpr-transport/sdk/src/Transport.cpp diff --git a/cpp/wedpr-transport/sdk/Transport.h b/cpp/wedpr-transport/sdk/src/Transport.h similarity index 100% rename from cpp/wedpr-transport/sdk/Transport.h rename to cpp/wedpr-transport/sdk/src/Transport.h diff --git a/cpp/wedpr-transport/sdk/TransportBuilder.cpp b/cpp/wedpr-transport/sdk/src/TransportBuilder.cpp similarity index 100% rename from cpp/wedpr-transport/sdk/TransportBuilder.cpp rename to cpp/wedpr-transport/sdk/src/TransportBuilder.cpp diff --git a/cpp/wedpr-transport/sdk/TransportBuilder.h b/cpp/wedpr-transport/sdk/src/TransportBuilder.h similarity index 100% rename from cpp/wedpr-transport/sdk/TransportBuilder.h rename to cpp/wedpr-transport/sdk/src/TransportBuilder.h diff --git a/cpp/wedpr-transport/sdk/TransportImpl.h b/cpp/wedpr-transport/sdk/src/TransportImpl.h similarity index 100% rename from cpp/wedpr-transport/sdk/TransportImpl.h rename to cpp/wedpr-transport/sdk/src/TransportImpl.h