From ef3a682f8a6a11f3b89d3585822c63903a1d1343 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Sat, 24 Aug 2024 00:35:57 +0800 Subject: [PATCH] add Message and router implement --- cpp/cmake/IncludeDirectories.cmake | 1 + cpp/ppc-framework/Common.h | 15 + cpp/ppc-framework/gateway/GatewayProtocol.h | 37 +++ cpp/ppc-framework/protocol/Message.h | 150 +++++---- cpp/ppc-framework/protocol/MessagePayload.h | 68 ++++ cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp | 299 ++++++++++++++++++ cpp/ppc-gateway/ppc-gateway/p2p/Service.h | 86 +++++ .../ppc-gateway/p2p/router/RouterManager.cpp | 189 +++++++++++ .../ppc-gateway/p2p/router/RouterManager.h | 72 +++++ .../p2p/router/RouterTableImpl.cpp | 273 ++++++++++++++++ .../ppc-gateway/p2p/router/RouterTableImpl.h | 132 ++++++++ .../p2p/router/RouterTableInterface.h | 92 ++++++ cpp/ppc-protocol/src/JsonTaskImpl.h | 23 +- cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp | 146 +++++++++ cpp/ppc-protocol/src/v1/MessageHeaderImpl.h | 82 +++++ cpp/ppc-protocol/src/v1/MessageImpl.cpp | 63 ++++ cpp/ppc-protocol/src/v1/MessageImpl.h | 84 +++++ .../src/v1/MessagePayloadImpl.cpp | 67 ++++ cpp/ppc-protocol/src/v1/MessagePayloadImpl.h | 54 ++++ .../ppc-tars-protocol/tars/RouterTable.tars | 13 + cpp/ppc-tools/src/codec/CodecUtility.h | 1 - cpp/ppc-utilities/Utilities.h | 18 +- cpp/vcpkg-configuration.json | 4 +- 23 files changed, 1884 insertions(+), 85 deletions(-) create mode 100644 cpp/ppc-framework/gateway/GatewayProtocol.h create mode 100644 cpp/ppc-framework/protocol/MessagePayload.h create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/Service.h create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.h create mode 100644 cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableInterface.h create mode 100644 cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp create mode 100644 cpp/ppc-protocol/src/v1/MessageHeaderImpl.h create mode 100644 cpp/ppc-protocol/src/v1/MessageImpl.cpp create mode 100644 cpp/ppc-protocol/src/v1/MessageImpl.h create mode 100644 cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp create mode 100644 cpp/ppc-protocol/src/v1/MessagePayloadImpl.h create mode 100644 cpp/ppc-tars-protocol/ppc-tars-protocol/tars/RouterTable.tars diff --git a/cpp/cmake/IncludeDirectories.cmake b/cpp/cmake/IncludeDirectories.cmake index f63512f5..d6be1b25 100644 --- a/cpp/cmake/IncludeDirectories.cmake +++ b/cpp/cmake/IncludeDirectories.cmake @@ -1,5 +1,6 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) +include_directories(${CMAKE_BINARY_DIR}/generated/) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/ppc-front) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/ppc-gateway) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/wedpr-component-sdk) diff --git a/cpp/ppc-framework/Common.h b/cpp/ppc-framework/Common.h index 6034c5b6..2ae44dfd 100644 --- a/cpp/ppc-framework/Common.h +++ b/cpp/ppc-framework/Common.h @@ -59,10 +59,25 @@ namespace ppc DERIVE_PPC_EXCEPTION(OpenFileFailed); DERIVE_PPC_EXCEPTION(DataSchemaNotSetted); DERIVE_PPC_EXCEPTION(UnsupportedDataSchema); +DERIVE_PPC_EXCEPTION(WeDPRException); constexpr static int MAX_PORT = 65535; constexpr static int DEFAULT_SECURITY_PARAM = 128; +constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18; +constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8; +constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26; + +inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept +{ + if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH) + { + return p2pId; + } + return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC); +} + + #if ENABLE_CPU_FEATURES #if X86 static const cpu_features::X86Features CPU_FEATURES = cpu_features::GetX86Info().features; diff --git a/cpp/ppc-framework/gateway/GatewayProtocol.h b/cpp/ppc-framework/gateway/GatewayProtocol.h new file mode 100644 index 00000000..c3c0a94d --- /dev/null +++ b/cpp/ppc-framework/gateway/GatewayProtocol.h @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Message.h + * @author: yujiechen + * @date 2024-08-22 + */ +#pragma once +#include + +namespace ppc::gateway +{ +enum class GatewayPacketType : uint16_t +{ + P2PMessage = 0x00, + RouterTableSyncSeq = 0x10, + RouterTableResponse = 0x11, + RouterTableRequest = 0x12 +}; + +enum class GatewayMsgExtFlag : uint16_t +{ + Response = 0x1, +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/Message.h b/cpp/ppc-framework/protocol/Message.h index 0aa7443a..40c5f614 100644 --- a/cpp/ppc-framework/protocol/Message.h +++ b/cpp/ppc-framework/protocol/Message.h @@ -18,9 +18,12 @@ * @date 2024-08-22 */ #pragma once +#include "../Common.h" #include #include +#include #include +#include namespace ppc::protocol { @@ -31,18 +34,24 @@ class MessageOptionalHeader MessageOptionalHeader() = default; virtual ~MessageOptionalHeader() = default; - virtual bcos::bytes encode() const = 0; + virtual void encode(bcos::bytes& buffer) const = 0; + virtual int64_t decode(bcos::bytesConstRef data, uint64_t const _offset) = 0; + // the componentType virtual uint8_t componentType() const { return m_componentType; } + virtual void setComponentType(uint8_t componentType) { m_componentType = componentType; } + // the source nodeID that send the message virtual bcos::bytes const& srcNode() const { return m_srcNode; } + virtual void setSrcNode(bcos::bytes const& srcNode) { m_srcNode = srcNode; } + // the target nodeID that should receive the message virtual bcos::bytes const& dstNode() const { return m_dstNode; } + virtual void setDstNode(bcos::bytes const& dstNode) { m_dstNode = dstNode; } + // the target agency that need receive the message virtual bcos::bytes const& dstInst() const { return m_dstInst; } - -protected: - virtual uint32_t decode(bcos::bytesConstRef data) = 0; + virtual void setDstInst(bcos::bytes const& dstInst) { m_dstInst = dstInst; } protected: // the componentType @@ -54,6 +63,7 @@ class MessageOptionalHeader // the target agency that need receive the message bcos::bytes m_dstInst; }; + class MessageHeader { public: @@ -61,41 +71,51 @@ class MessageHeader MessageHeader() = default; virtual ~MessageHeader() = default; - virtual bcos::bytes encode() const = 0; + virtual void encode(bcos::bytes& buffer) const = 0; + virtual int64_t decode(bcos::bytesConstRef data) = 0; // the msg version, used to support compatibility virtual uint8_t version() const { return m_version; } + virtual void setVersion(uint16_t version) { m_version = version; } // the traceID virtual std::string const& traceID() const { return m_traceID; } + virtual void setTraceID(std::string traceID) { m_traceID = traceID; } + // the srcGwNode - virtual bcos::bytes const& srcGwNode() const { return m_srcGwNode; } + virtual std::string const& srcGwNode() const { return m_srcGwNode; } + virtual void setSrcGwNode(std::string const& srcGwNode) { m_srcGwNode = srcGwNode; } + // the dstGwNode - virtual bcos::bytes const& dstGwNode() const { return m_dstGwNode; } + virtual std::string const& dstGwNode() const { return m_dstGwNode; } + virtual void setDstGwNode(std::string const& dstGwNode) { m_dstGwNode = dstGwNode; } + // the packetType virtual uint16_t packetType() const { return m_packetType; } + virtual void setPacketType(uint16_t packetType) { m_packetType = packetType; } // the ttl virtual int16_t ttl() const { return m_ttl; } + virtual void setTTL(uint16_t ttl) { m_ttl = ttl; } + // the ext(contains the router policy and response flag) virtual uint16_t ext() const { return m_ext; } - //// the optional field(used to route between components and nodes) - virtual MessageOptionalHeader::Ptr optionalFields() const { return m_optionalFields; } - - virtual uint32_t length() const { return m_length; } - - virtual void setVersion(uint16_t version) { m_version = version; } - virtual void setPacketType(uint16_t packetType) { m_packetType = packetType; } - // the seq is the traceID - virtual void setTraceID(std::string traceID) { m_traceID = traceID; } virtual void setExt(uint16_t ext) { m_ext = ext; } + //// the optional field(used to route between components and nodes) + virtual MessageOptionalHeader::Ptr optionalField() const { return m_optionalField; } + void setOptionalField(MessageOptionalHeader::Ptr optionalField) + { + m_optionalField = std::move(optionalField); + } - uint64_t packetLen() const { return m_packetLen; } - uint16_t headerLen() const { return m_headerLen; } + virtual uint16_t length() const { return m_length; } virtual bool isRespPacket() const = 0; virtual void setRespPacket() = 0; -protected: - virtual uint32_t decode(bcos::bytesConstRef data) = 0; + + // Note: only for log + std::string_view srcP2PNodeIDView() const { return printP2PIDElegantly(m_srcGwNode); } + // Note: only for log + std::string_view dstP2PNodeIDView() const { return printP2PIDElegantly(m_dstGwNode); } protected: // the msg version, used to support compatibility @@ -103,9 +123,9 @@ class MessageHeader // the traceID std::string m_traceID; // the srcGwNode - bcos::bytes m_srcGwNode; + std::string m_srcGwNode; // the dstGwNode - bcos::bytes m_dstGwNode; + std::string m_dstGwNode; // the packetType uint16_t m_packetType; // the ttl @@ -113,37 +133,8 @@ class MessageHeader // the ext(contains the router policy and response flag) uint16_t m_ext; //// the optional field(used to route between components and nodes) - MessageOptionalHeader::Ptr m_optionalFields; - uint64_t m_packetLen; - uint16_t m_headerLen; -}; - -class MessagePayload -{ -public: - using Ptr = std::shared_ptr; - MessagePayload() = default; - virtual ~MessagePayload() = default; - - virtual bcos::bytes encode() const = 0; - - // the version - virtual uint8_t version() const { return m_version; } - // the topic - virtual std::string const& topic() const { return m_topic; } - virtual bcos::bytes const& data() const { return m_data; } - virtual uint32_t length() const { return m_length; } - -protected: - virtual uint32_t decode(uint32_t startPos, bcos::bytesConstRef data) = 0; - -protected: - // the front payload version, used to support compatibility - uint8_t m_version; - // the topic - std::string m_topic; - bcos::bytes m_data; - uint32_t m_length; + MessageOptionalHeader::Ptr m_optionalField; + uint16_t mutable m_length; }; class Message : virtual public bcos::boostssl::MessageFace @@ -154,36 +145,48 @@ class Message : virtual public bcos::boostssl::MessageFace ~Message() override {} virtual MessageHeader::Ptr header() const { return m_header; } - virtual MessagePayLoad::Ptr payload() const { return m_payload; } + virtual void setHeader(MessageHeader::Ptr header) { m_header = std::move(header); } + /// the overloaed implementation === uint16_t version() const override { return m_header->version(); } - void setVersion(uint16_t version) override{m_header->setVersion(version)} uint16_t - packetType() const override - { - return m_header->packetType(); - } + void setVersion(uint16_t version) override { m_header->setVersion(version); } + uint16_t packetType() const override { return m_header->packetType(); } void setPacketType(uint16_t packetType) override { m_header->setPacketType(packetType); } std::string const& seq() const override { return m_header->traceID(); } void setSeq(std::string traceID) override { m_header->setTraceID(traceID); } uint16_t ext() const override { return m_header->ext(); } - void setExt(uint16_t ext) override { m_header->setExt(except); } + void setExt(uint16_t ext) override { m_header->setExt(ext); } bool isRespPacket() const override { return m_header->isRespPacket(); } void setRespPacket() override { m_header->setRespPacket(); } - virtual uint32_t length() const override { return m_header->packetLen(); } + virtual uint32_t length() const override + { + return m_header->length() + (m_payload ? m_payload->size() : 0); + } + std::shared_ptr payload() const override { return m_payload; } void setPayload(std::shared_ptr _payload) override { m_payload = std::move(_payload); } - protected: MessageHeader::Ptr m_header; std::shared_ptr m_payload; }; +class MessageHeaderBuilder +{ +public: + using Ptr = std::shared_ptr; + MessageHeaderBuilder() = default; + virtual ~MessageHeaderBuilder() = default; + + virtual MessageHeader::Ptr build(bcos::bytesConstRef _data) = 0; + virtual MessageHeader::Ptr build() = 0; +}; + class MessageBuilder { public: @@ -191,5 +194,28 @@ class MessageBuilder virtual ~MessageBuilder() = default; virtual Message::Ptr build() = 0; + virtual Message::Ptr build(bcos::bytesConstRef buffer) = 0; +}; + +inline std::string printMessage(Message::Ptr const& _msg) +{ + std::ostringstream stringstream; + stringstream << LOG_KV("from", _msg->header()->srcP2PNodeIDView()) + << LOG_KV("to", _msg->header()->dstP2PNodeIDView()) + << LOG_KV("ttl", _msg->header()->ttl()) + << LOG_KV("rsp", _msg->header()->isRespPacket()) + << LOG_KV("traceID", _msg->header()->traceID()) + << LOG_KV("packetType", _msg->header()->packetType()) + << LOG_KV("length", _msg->length()); + return stringstream.str(); +} + +inline std::string printWsMessage(bcos::boostssl::MessageFace::Ptr const& _msg) +{ + std::ostringstream stringstream; + stringstream << LOG_KV("rsp", _msg->isRespPacket()) << LOG_KV("traceID", _msg->seq()) + << LOG_KV("packetType", _msg->packetType()) << LOG_KV("length", _msg->length()); + return stringstream.str(); } + } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/MessagePayload.h b/cpp/ppc-framework/protocol/MessagePayload.h new file mode 100644 index 00000000..4329aae2 --- /dev/null +++ b/cpp/ppc-framework/protocol/MessagePayload.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessagePayload.h + * @author: yujiechen + * @date 2024-08-22 + */ +#pragma once +#include +#include + +namespace ppc::protocol +{ +class MessagePayload +{ +public: + using Ptr = std::shared_ptr; + MessagePayload() = default; + virtual ~MessagePayload() = default; + + virtual int64_t encode(bcos::bytes& buffer) const = 0; + virtual int64_t decode(bcos::bytesConstRef data) = 0; + + // the version + virtual uint8_t version() const { return m_version; } + virtual void setVersion(uint8_t version) { m_version = version; } + // the topic + virtual std::string const& topic() const { return m_topic; } + virtual void setTopic(std::string&& topic) { m_topic = std::move(topic); } + virtual void setTopic(std::string const& topic) { m_topic = topic; } + // data + virtual bcos::bytes const& data() const { return m_data; } + virtual void setData(bcos::bytes&& data) { m_data = std::move(data); } + virtual void setData(bcos::bytes const& data) { m_data = data; } + // the length + virtual int64_t length() const { return m_length; } + +protected: + // the front payload version, used to support compatibility + uint8_t m_version; + // the topic + std::string m_topic; + bcos::bytes m_data; + int64_t mutable m_length; +}; + +class MessagePayloadBuilder +{ +public: + using Ptr = std::shared_ptr; + MessagePayloadBuilder() = default; + virtual ~MessagePayloadBuilder() = default; + virtual MessagePayload::Ptr build() = 0; + virtual MessagePayload::Ptr build(bcos::bytesConstRef buffer) = 0; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp new file mode 100644 index 00000000..469e5b27 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -0,0 +1,299 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Service.cpp + * @author: yujiechen + * @date 2024-08-26 + */ + +#include "Service.h" +#include "bcos-boostssl/websocket/WsError.h" +#include "ppc-framework/Common.h" + +using namespace bcos; +using namespace ppc; +using namespace ppc::gateway; +using namespace ppc::protocol; +using namespace bcos::boostssl::ws; +using namespace bcos::boostssl; + +Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _routerTableFactory, + int unreachableDistance, std::string _moduleName) + : WsService(_moduleName) +{ + m_nodeID = _nodeID; + m_routerTableFactory = _routerTableFactory; + // create the local router + m_routerTable = m_routerTableFactory->createRouterTable(); + m_routerTable->setNodeID(m_nodeID); + m_routerTable->setUnreachableDistance(unreachableDistance); + + GATEWAY_LOG(INFO) << LOG_DESC("create P2PService") << LOG_KV("module", _moduleName); + WsService::registerConnectHandler( + boost::bind(&Service::onP2PConnect, this, boost::placeholders::_1)); + WsService::registerDisconnectHandler( + boost::bind(&Service::onP2PDisconnect, this, boost::placeholders::_1)); +} + + +void Service::onP2PConnect(WsSession::Ptr _session) +{ + GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect") << LOG_KV("p2pid", _session->nodeId()) + << LOG_KV("endpoint", _session->endPoint()); + + + RecursiveGuard l(x_nodeID2Session); + auto it = m_nodeID2Session.find(_session->nodeId()); + // the session already connected + if (it != m_nodeID2Session.end() && it->second->isConnected()) + { + GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect, drop the duplicated connection") + << LOG_KV("nodeID", _session->nodeId()) + << LOG_KV("endpoint", _session->endPoint()); + _session->drop(WsError::UserDisconnect); + updateNodeIDInfo(_session); + return; + } + // the node-self + if (_session->nodeId() == m_nodeID) + { + updateNodeIDInfo(_session); + GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect, drop the node-self connection") + << LOG_KV("nodeID", _session->nodeId()) + << LOG_KV("endpoint", _session->endPoint()); + _session->drop(WsError::UserDisconnect); + return; + } + // the new session + updateNodeIDInfo(_session); + if (it != m_nodeID2Session.end()) + { + it->second = _session; + } + else + { + m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session)); + } + GATEWAY_LOG(INFO) << LOG_DESC("onP2PConnect established") << LOG_KV("p2pid", _session->nodeId()) + << LOG_KV("endpoint", _session->endPoint()); +} + + +void Service::updateNodeIDInfo(WsSession::Ptr const& _session) +{ + bcos::WriteGuard l(x_configuredNode2ID); + std::string p2pNodeID = _session->nodeId(); + auto it = m_configuredNode2ID.find(_session->endPointInfo()); + if (it != m_configuredNode2ID.end()) + { + it->second = p2pNodeID; + GATEWAY_LOG(INFO) << LOG_DESC("updateNodeIDInfo: update the nodeID") + << LOG_KV("nodeid", p2pNodeID) + << LOG_KV("endpoint", _session->endPoint()); + } + else + { + GATEWAY_LOG(INFO) << LOG_DESC("updateNodeIDInfo can't find endpoint") + << LOG_KV("nodeid", p2pNodeID) + << LOG_KV("endpoint", _session->endPoint()); + } +} + +void Service::removeSessionInfo(WsSession::Ptr const& _session) +{ + RecursiveGuard l(x_nodeID2Session); + auto it = m_nodeID2Session.find(_session->nodeId()); + if (it != m_nodeID2Session.end()) + { + GATEWAY_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session" + << LOG_KV("p2pid", _session->nodeId()) + << LOG_KV("endpoint", _session->endPoint()); + + m_nodeID2Session.erase(it); + } +} +void Service::onP2PDisconnect(WsSession::Ptr _session) +{ + // remove the session information + removeSessionInfo(_session); + // update the session nodeID to empty + UpgradableGuard l(x_configuredNode2ID); + for (auto& it : m_configuredNode2ID) + { + if (it.second == _session->nodeId()) + { + UpgradeGuard ul(l); + it.second.clear(); + break; + } + } +} + +void Service::reconnect() +{ + // obtain the un-connected peers information + EndPointsPtr unconnectedPeers = std::make_shared>(); + { + bcos::ReadGuard l(x_configuredNode2ID); + for (auto const& it : m_configuredNode2ID) + { + if (it.second == nodeID()) + { + continue; + } + if (!it.second.empty() && isConnected(it.first)) + { + continue; + } + unconnectedPeers->insert(it.first); + } + } + setReconnectedPeers(unconnectedPeers); + WsService::reconnect(); +} + +WsSession::Ptr Service::getSessionByNodeID(std::string const& _nodeID) +{ + RecursiveGuard l(x_nodeID2Session); + auto it = m_nodeID2Session.find(_nodeID); + if (it == m_nodeID2Session.end()) + { + return nullptr; + } + return it->second; +} + +void Service::asyncSendMessageByNodeID( + std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc) +{ + auto p2pMsg = std::dynamic_pointer_cast(msg); + if (p2pMsg->header()->dstGwNode().empty()) + { + p2pMsg->header()->setDstGwNode(dstNodeID); + } + if (p2pMsg->header()->srcGwNode().empty()) + { + p2pMsg->header()->setSrcGwNode(m_nodeID); + } + return asyncSendMessageWithForward(dstNodeID, msg, options, respFunc); +} + +void Service::asyncSendMessageWithForward( + std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc) +{ + auto p2pMsg = std::dynamic_pointer_cast(msg); + // without nextHop: maybe network unreachable or with distance equal to 1 + auto nextHop = m_routerTable->getNextHop(dstNodeID); + if (nextHop.empty()) + { + return asyncSendMessage(dstNodeID, msg, options, respFunc); + } + // with nextHop, send the message to nextHop + GATEWAY_LOG(TRACE) << LOG_DESC("asyncSendMessageByNodeID") << printMessage(p2pMsg); + return asyncSendMessage(nextHop, msg, options, respFunc); +} + + +void Service::asyncSendMessage( + std::string const& dstNodeID, MessageFace::Ptr msg, Options options, RespCallBack respFunc) +{ + try + { + // ignore self + if (dstNodeID == m_nodeID) + { + return; + } + auto session = getSessionByNodeID(dstNodeID); + if (session) + { + WsSessions sessions = WsSessions(); + sessions.emplace_back(session); + return WsService::asyncSendMessage(sessions, msg, options, respFunc); + } + + if (respFunc) + { + Error::Ptr error = std::make_shared( + -1, "send message to " + dstNodeID + + " failed for no network established, msg: " + printWsMessage(msg)); + respFunc(std::move(error), nullptr, nullptr); + } + GATEWAY_LOG(WARNING) + << LOG_DESC("asyncSendMessageByNodeID failed for no network established, msg detail:") + << printWsMessage(msg); + } + catch (std::exception const& e) + { + GATEWAY_LOG(ERROR) << "asyncSendMessageByNodeID" << LOG_KV("dstNode", dstNodeID) + << LOG_KV("what", boost::diagnostic_information(e)); + if (respFunc) + { + respFunc(std::make_shared(-1, "send message to " + dstNodeID + " failed for " + + boost::diagnostic_information(e)), + nullptr, nullptr); + } + } +} + +void Service::onRecvMessage(MessageFace::Ptr _msg, std::shared_ptr _session) +{ + auto p2pMsg = std::dynamic_pointer_cast(_msg); + // find the dstNode + if (p2pMsg->header()->dstGwNode().empty() || p2pMsg->header()->dstGwNode() == m_nodeID) + { + GATEWAY_LOG(TRACE) << LOG_DESC("onRecvMessage, dispatch for find the dst node") + << printMessage(p2pMsg); + WsService::onRecvMessage(_msg, _session); + return; + } + // forward the message + if (p2pMsg->header()->ttl() >= m_routerTable->unreachableDistance()) + { + GATEWAY_LOG(WARNING) << LOG_DESC("onRecvMessage: ttl expired") << printMessage(p2pMsg); + return; + } + p2pMsg->header()->setTTL(p2pMsg->header()->ttl() + 1); + asyncSendMessageWithForward( + p2pMsg->header()->dstGwNode(), p2pMsg, bcos::boostssl::ws::Options(), nullptr); +} + + +void Service::asyncBroadcastMessage(bcos::boostssl::MessageFace::Ptr msg, Options options) +{ + auto reachableNodes = m_routerTable->getAllReachableNode(); + try + { + for (auto const& node : reachableNodes) + { + asyncSendMessageByNodeID(node, msg, options); + } + } + catch (std::exception& e) + { + GATEWAY_LOG(WARNING) << LOG_BADGE("asyncBroadcastMessage exception") + << LOG_KV("msg", printWsMessage(msg)) + << LOG_KV("error", boost::diagnostic_information(e)); + } +} + +void Service::asyncSendMessageByP2PNodeID(uint16_t type, std::string const& dstNodeID, + std::shared_ptr payload, Options options, RespCallBack callback) +{ + auto message = m_messageFactory->buildMessage(); + message->setPacketType(type); + message->setPayload(payload); + asyncSendMessageByNodeID(dstNodeID, message, options, callback); +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/Service.h b/cpp/ppc-gateway/ppc-gateway/p2p/Service.h new file mode 100644 index 00000000..a3c8f5ea --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/Service.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Service.h + * @author: yujiechen + * @date 2024-08-26 + */ + +#pragma once +#include "../Common.h" +#include "ppc-framework/protocol/Message.h" +#include "router/RouterTableInterface.h" +#include +namespace ppc::gateway +{ +class Service : public bcos::boostssl::ws::WsService +{ +public: + using Ptr = std::shared_ptr; + Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _routerTableFactory, + int unreachableDistance, std::string _moduleName = "DEFAULT"); + + virtual void asyncSendMessageByNodeID(std::string const& dstNodeID, + bcos::boostssl::MessageFace::Ptr msg, + bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options(), + bcos::boostssl::ws::RespCallBack respFunc = bcos::boostssl::ws::RespCallBack()); + virtual void asyncSendMessageByP2PNodeID(uint16_t type, std::string const& dstNodeID, + std::shared_ptr payload, + bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options(), + bcos::boostssl::ws::RespCallBack callback = bcos::boostssl::ws::RespCallBack()); + + virtual void asyncBroadcastMessage(bcos::boostssl::MessageFace::Ptr msg, + bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options()); + + RouterTableFactory::Ptr const& routerTableFactory() const { return m_routerTableFactory; } + RouterTableInterface::Ptr const& routerTable() const { return m_routerTable; } + + std::string const& nodeID() const { return m_nodeID; } + +protected: + void onRecvMessage(bcos::boostssl::MessageFace::Ptr _msg, + bcos::boostssl::ws::WsSession::Ptr _session) override; + + virtual void onP2PConnect(bcos::boostssl::ws::WsSession::Ptr _session); + virtual void onP2PDisconnect(bcos::boostssl::ws::WsSession::Ptr _session); + + void reconnect() override; + + void updateNodeIDInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); + void removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session); + bcos::boostssl::ws::WsSession::Ptr getSessionByNodeID(std::string const& _nodeID); + + virtual void asyncSendMessageWithForward(std::string const& dstNodeID, + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options, + bcos::boostssl::ws::RespCallBack respFunc); + + virtual void asyncSendMessage(std::string const& dstNodeID, + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options, + bcos::boostssl::ws::RespCallBack respFunc); + +protected: + std::string m_nodeID; + // nodeID=>session + std::unordered_map m_nodeID2Session; + bcos::RecursiveMutex x_nodeID2Session; + + RouterTableFactory::Ptr m_routerTableFactory; + RouterTableInterface::Ptr m_routerTable; + + // configuredNode=>nodeID + std::map m_configuredNode2ID; + mutable bcos::SharedMutex x_configuredNode2ID; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp new file mode 100644 index 00000000..00f3e7c7 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.cpp @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouterManager.cpp + * @author: yujiechen + * @date 2024-08-26 + */ +#include "RouterManager.h" +#include "ppc-framework/gateway/GatewayProtocol.h" +#include "ppc-framework/protocol/Message.h" +#include + +using namespace bcos; +using namespace bcos::boostssl::ws; +using namespace bcos::boostssl; +using namespace ppc::gateway; +using namespace ppc::protocol; + +RouterManager::RouterManager(Service::Ptr service) : m_service(std::move(service)) +{ + // process router packet related logic + m_service->registerMsgHandler((uint16_t)GatewayPacketType::RouterTableSyncSeq, + boost::bind(&RouterManager::onReceiveRouterSeq, this, boost::placeholders::_1, + boost::placeholders::_2)); + + m_service->registerMsgHandler((uint16_t)GatewayPacketType::RouterTableResponse, + boost::bind(&RouterManager::onReceivePeersRouterTable, this, boost::placeholders::_1, + boost::placeholders::_2)); + + m_service->registerMsgHandler((uint16_t)GatewayPacketType::RouterTableRequest, + boost::bind(&RouterManager::onReceiveRouterTableRequest, this, boost::placeholders::_1, + boost::placeholders::_2)); + m_routerTimer = std::make_shared(3000, "routerSeqSync"); + m_routerTimer->registerTimeoutHandler([this]() { broadcastRouterSeq(); }); +} + +void RouterManager::start() +{ + if (m_routerTimer) + { + m_routerTimer->start(); + } +} + +void RouterManager::stop() +{ + if (m_routerTimer) + { + m_routerTimer->stop(); + } +} + +void RouterManager::onReceiveRouterSeq(MessageFace::Ptr msg, WsSession::Ptr session) +{ + auto statusSeq = + boost::asio::detail::socket_ops::network_to_host_long(*((uint32_t*)msg->payload()->data())); + if (!tryToUpdateSeq(session->nodeId(), statusSeq)) + { + return; + } + GATEWAY_LOG(INFO) << LOG_BADGE("onReceiveRouterSeq") + << LOG_DESC("receive router seq and request router table") + << LOG_KV("peer", session->nodeId()) << LOG_KV("seq", statusSeq); + // request router table to peer + auto p2pMsg = std::dynamic_pointer_cast(msg); + auto dstP2PNodeID = (!p2pMsg->header()->srcGwNode().empty()) ? p2pMsg->header()->srcGwNode() : + session->nodeId(); + m_service->asyncSendMessageByP2PNodeID((uint16_t)GatewayPacketType::RouterTableRequest, + dstP2PNodeID, std::make_shared()); +} + +bool RouterManager::tryToUpdateSeq(std::string const& _p2pNodeID, uint32_t _seq) +{ + UpgradableGuard l(x_node2Seq); + auto it = m_node2Seq.find(_p2pNodeID); + if (it != m_node2Seq.end() && it->second >= _seq) + { + return false; + } + UpgradeGuard upgradeGuard(l); + m_node2Seq[_p2pNodeID] = _seq; + return true; +} + +// receive routerTable from peers +void RouterManager::onReceivePeersRouterTable(MessageFace::Ptr msg, WsSession::Ptr session) +{ + auto routerTable = m_service->routerTableFactory()->createRouterTable(ref(*(msg->payload()))); + + GATEWAY_LOG(INFO) << LOG_BADGE("onReceivePeersRouterTable") << LOG_KV("peer", session->nodeId()) + << LOG_KV("entrySize", routerTable->routerEntries().size()); + joinRouterTable(session->nodeId(), routerTable); +} + +// receive routerTable request from peer +void RouterManager::onReceiveRouterTableRequest(MessageFace::Ptr msg, WsSession::Ptr session) +{ + GATEWAY_LOG(INFO) << LOG_BADGE("onReceiveRouterTableRequest") + << LOG_KV("peer", session->nodeId()) + << LOG_KV("entrySize", m_service->routerTable()->routerEntries().size()); + + auto routerTableData = std::make_shared(); + m_service->routerTable()->encode(*routerTableData); + auto p2pMsg = std::dynamic_pointer_cast(msg); + auto dstP2PNodeID = (!p2pMsg->header()->srcGwNode().empty()) ? p2pMsg->header()->srcGwNode() : + session->nodeId(); + m_service->asyncSendMessageByP2PNodeID( + (uint16_t)GatewayPacketType::RouterTableResponse, dstP2PNodeID, routerTableData); +} + +void RouterManager::joinRouterTable( + std::string const& _generatedFrom, RouterTableInterface::Ptr _routerTable) +{ + std::set unreachableNodes; + bool updated = false; + auto const& entries = _routerTable->routerEntries(); + for (auto const& it : entries) + { + auto entry = it.second; + if (m_service->routerTable()->update(unreachableNodes, _generatedFrom, entry) && !updated) + { + updated = true; + } + } + + GATEWAY_LOG(INFO) << LOG_BADGE("joinRouterTable") << LOG_DESC("create router entry") + << LOG_KV("dst", _generatedFrom); + + auto entry = m_service->routerTableFactory()->createRouterEntry(); + entry->setDstNode(_generatedFrom); + entry->setDistance(0); + if (m_service->routerTable()->update(unreachableNodes, m_service->nodeID(), entry) && !updated) + { + updated = true; + } + if (!updated) + { + GATEWAY_LOG(DEBUG) << LOG_BADGE("joinRouterTable") << LOG_DESC("router table not updated") + << LOG_KV("dst", _generatedFrom); + return; + } + onP2PNodesUnreachable(unreachableNodes); + m_statusSeq++; + broadcastRouterSeq(); +} + + +// called when the nodes become unreachable +void RouterManager::onP2PNodesUnreachable(std::set const& _p2pNodeIDs) +{ + std::vector> handlers; + { + ReadGuard readGuard(x_unreachableHandlers); + handlers = m_unreachableHandlers; + } + // TODO: async here + for (auto const& node : _p2pNodeIDs) + { + for (auto const& it : m_unreachableHandlers) + { + it(node); + } + } +} + +void RouterManager::broadcastRouterSeq() +{ + m_routerTimer->restart(); + + auto seq = m_statusSeq.load(); + auto statusSeq = boost::asio::detail::socket_ops::host_to_network_long(seq); + auto message = m_service->messageFactory()->buildMessage(); + message->setPacketType((uint16_t)GatewayPacketType::RouterTableSyncSeq); + message->setPayload(std::make_shared((byte*)&statusSeq, (byte*)&statusSeq + 4)); + // the router table should only exchange between neighbor + m_service->broadcastMessage(message); +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h new file mode 100644 index 00000000..f99c0182 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouterManager.h + * @author: yujiechen + * @date 2024-08-26 + */ + +#pragma once +#include "../Service.h" +#include "RouterTableInterface.h" +#include + +namespace ppc::gateway +{ +class RouterManager +{ +public: + RouterManager(Service::Ptr service); + virtual ~RouterManager() = default; + + // handlers called when the node is unreachable + void registerUnreachableHandler(std::function _handler) + { + bcos::WriteGuard l(x_unreachableHandlers); + m_unreachableHandlers.emplace_back(_handler); + } + + virtual void start(); + virtual void stop(); + +private: + void onReceiveRouterSeq( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + bool tryToUpdateSeq(std::string const& _p2pNodeID, uint32_t _seq); + void broadcastRouterSeq(); + + void onReceivePeersRouterTable( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + void onReceiveRouterTableRequest( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + + void joinRouterTable(std::string const& _generatedFrom, RouterTableInterface::Ptr _routerTable); + + void onP2PNodesUnreachable(std::set const& _p2pNodeIDs); + +private: + // for message forward + Service::Ptr m_service; + std::shared_ptr m_routerTimer; + std::atomic m_statusSeq{1}; + + // called when the given node unreachable + std::vector> m_unreachableHandlers; + mutable bcos::SharedMutex x_unreachableHandlers; + + std::map m_node2Seq; + mutable bcos::SharedMutex x_node2Seq; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp new file mode 100644 index 00000000..f8a2a22a --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.cpp @@ -0,0 +1,273 @@ +/* + * Copyright (C) 2024 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouterTableImpl.cpp + * @author: yujiechen + * @date 2022-5-24 + */ +#include "RouterTableImpl.h" +#include "ppc-gateway/Common.h" + +using namespace bcos; +using namespace ppc::gateway; +using namespace ppc::protocol; + +void RouterTable::encode(bcos::bytes& _encodedData) +{ + WriteGuard writeGuard(x_routerEntries); + m_inner()->routerEntries.clear(); + // encode m_routerEntries + for (auto const& it : m_routerEntries) + { + auto entry = std::dynamic_pointer_cast(it.second); + m_inner()->routerEntries.emplace_back(entry->inner()); + } + tars::TarsOutputStream output; + m_inner()->writeTo(output); + output.getByteBuffer().swap(_encodedData); +} + +void RouterTable::decode(bcos::bytesConstRef _decodedData) +{ + tars::TarsInputStream input; + input.setBuffer((const char*)_decodedData.data(), _decodedData.size()); + WriteGuard writeGuard(x_routerEntries); + m_inner()->readFrom(input); + // decode into m_routerEntries + m_routerEntries.clear(); + for (auto& it : m_inner()->routerEntries) + { + auto entry = + std::make_shared([m_entry = it]() mutable { return &m_entry; }); + m_routerEntries.insert(std::make_pair(entry->dstNode(), entry)); + } +} + +bool RouterTable::erase(std::set& _unreachableNodes, std::string const& _p2pNodeID) +{ + bool updated = false; + WriteGuard writeGuard(x_routerEntries); + // erase router-entry of the _p2pNodeID + auto it = m_routerEntries.find(_p2pNodeID); + if (it != m_routerEntries.end()) + { + // Note: reset the distance to m_unreachableDistance, to notify that the _p2pNodeID is + // unreachable + it->second->setDistance(m_unreachableDistance); + it->second->clearNextHop(); + _unreachableNodes.insert(it->second->dstNode()); + + GATEWAY_LOG(INFO) << LOG_BADGE("erase") << LOG_DESC("make the router unreachable") + << LOG_KV("dst", _p2pNodeID) << LOG_KV("distance", it->second->distance()) + << LOG_KV("size", m_routerEntries.size()); + updated = true; + } + // update the router-entry with nextHop equal to _p2pNodeID to be unreachable + updateDistanceForAllRouterEntries(_unreachableNodes, _p2pNodeID, m_unreachableDistance); + return updated; +} + +void RouterTable::updateDistanceForAllRouterEntries( + std::set& _unreachableNodes, std::string const& _nextHop, int32_t _newDistance) +{ + for (auto& it : m_routerEntries) + { + auto entry = it.second; + if (entry->nextHop() == _nextHop) + { + auto oldDistance = entry->distance(); + entry->setDistance(_newDistance + (oldDistance - 1)); + if (entry->distance() >= m_unreachableDistance) + { + entry->clearNextHop(); + _unreachableNodes.insert(entry->dstNode()); + } + GATEWAY_LOG(INFO) << LOG_BADGE("updateDistanceForAllRouterEntries") + << LOG_DESC( + "update entry since the nextHop distance has been updated") + << LOG_KV("dst", entry->dstNode()) << LOG_KV("nextHop", _nextHop) + << LOG_KV("distance", entry->distance()) + << LOG_KV("oldDistance", oldDistance) + << LOG_KV("size", m_routerEntries.size()); + } + } +} + +bool RouterTable::update(std::set& _unreachableNodes, + std::string const& _generatedFrom, RouterTableEntryInterface::Ptr _entry) +{ + if (c_fileLogLevel <= TRACE) + [[unlikely]] + { + GATEWAY_LOG(TRACE) << LOG_BADGE("update") << LOG_DESC("receive entry") + << LOG_KV("dst", printP2PIDElegantly(_entry->dstNode())) + << LOG_KV("distance", _entry->distance()) + << LOG_KV("from", _generatedFrom); + } + auto ret = updateDstNodeEntry(_generatedFrom, _entry); + // the dst entry has not been updated + if (!ret) + { + return false; + } + + UpgradableGuard l(x_routerEntries); + auto it = m_routerEntries.find(_entry->dstNode()); + if (it == m_routerEntries.end()) + { + return false; + } + // get the latest distance + auto& currentEntry = it->second; + auto _newDistance = currentEntry->distance(); + if (_newDistance >= m_unreachableDistance) + { + currentEntry->clearNextHop(); + _unreachableNodes.insert(_entry->dstNode()); + } + // the dst entry has updated, update the distance of the router-entries with nextHop equal to + // dstNode + UpgradeGuard upgradeGuard(l); + if (_newDistance == 1) + { + currentEntry->clearNextHop(); + } + updateDistanceForAllRouterEntries(_unreachableNodes, _entry->dstNode(), _newDistance); + return true; +} + +bool RouterTable::updateDstNodeEntry( + std::string const& _generatedFrom, RouterTableEntryInterface::Ptr _entry) +{ + UpgradableGuard upgradableGuard(x_routerEntries); + // the node self + if (_entry->dstNode() == m_nodeID) + { + return false; + } + // insert new entry + auto it = m_routerEntries.find(_entry->dstNode()); + if (it == m_routerEntries.end()) + { + UpgradeGuard upgradeGuard(upgradableGuard); + _entry->incDistance(1); + if (_generatedFrom != m_nodeID) + { + _entry->setNextHop(_generatedFrom); + } + m_routerEntries.insert(std::make_pair(_entry->dstNode(), _entry)); + GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC("insert new entry into the routerTable") + << LOG_KV("distance", _entry->distance()) + << LOG_KV("dst", _entry->dstNode()) + << LOG_KV("nextHop", _entry->nextHop()) + << LOG_KV("size", m_routerEntries.size()); + return true; + } + + // discover smaller distance + auto currentEntry = it->second; + auto currentDistance = currentEntry->distance(); + auto distance = _entry->distance() + 1; + if (currentDistance > distance) + { + UpgradeGuard upgradeGuard(upgradableGuard); + if (_generatedFrom != m_nodeID) + { + currentEntry->setNextHop(_generatedFrom); + } + currentEntry->setDistance(distance); + GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC("discover smaller distance, update entry") + << LOG_KV("distance", currentEntry->distance()) + << LOG_KV("oldDistance", currentDistance) + << LOG_KV("dst", _entry->dstNode()) + << LOG_KV("nextHop", _entry->nextHop()) + << LOG_KV("size", m_routerEntries.size()); + return true; + } + // the distance information for the nextHop changed + if (currentEntry->nextHop() == _generatedFrom) + { + // distance not updated + if (currentEntry->distance() == distance) + { + return false; + } + // unreachable condition + if (currentEntry->distance() >= m_unreachableDistance && distance >= m_unreachableDistance) + { + return false; + } + currentEntry->setDistance(distance); + if (currentEntry->distance() >= m_unreachableDistance) + { + currentEntry->clearNextHop(); + } + GATEWAY_LOG(INFO) << LOG_BADGE("updateDstNodeEntry") + << LOG_DESC( + "distance of the nextHop entry " + "updated, update the current entry") + << LOG_KV("dst", currentEntry->dstNode()) + << LOG_KV("nextHop", currentEntry->nextHop()) + << LOG_KV("distance", currentEntry->distance()) + << LOG_KV("size", m_routerEntries.size()); + return true; + } + return false; +} + +std::string RouterTable::getNextHop(std::string const& _nodeID) +{ + std::string emptyNextHop; + ReadGuard readGuard(x_routerEntries); + auto it = m_routerEntries.find(_nodeID); + if (it == m_routerEntries.end()) + { + return emptyNextHop; + } + if (it->second->distance() >= m_unreachableDistance) + { + return emptyNextHop; + } + return it->second->nextHop(); +} + +std::set RouterTable::getAllReachableNode() +{ + std::set reachableNodes; + ReadGuard readGuard(x_routerEntries); + for (auto const& it : m_routerEntries) + { + auto entry = it.second; + if (entry->distance() < m_unreachableDistance) + { + reachableNodes.insert(entry->dstNode()); + } + } + + if (c_fileLogLevel <= LogLevel::TRACE) + [[unlikely]] + { + std::stringstream nodes; + std::for_each(reachableNodes.begin(), reachableNodes.end(), + [&](const auto& item) { nodes << printP2PIDElegantly(item) << ","; }); + GATEWAY_LOG(TRACE) << LOG_BADGE("getAllReachableNode") + << LOG_KV("nodes size", reachableNodes.size()) + << LOG_KV("nodes", nodes.str()); + } + + return reachableNodes; +} diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.h b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.h new file mode 100644 index 00000000..72eda232 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableImpl.h @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2024 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouterTableImpl.h + * @author: yujiechen + * @date 2022-5-24 + */ +#pragma once +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wunused-parameter" + +#include "RouterTableInterface.h" +#include +#include +#include + +namespace ppc::gateway +{ +class RouterTableEntry : public RouterTableEntryInterface +{ +public: + using Ptr = std::shared_ptr; + RouterTableEntry() + : m_inner([m_entry = ppctars::RouterTableEntry()]() mutable { return &m_entry; }) + {} + RouterTableEntry(std::function _inner) + : m_inner(std::move(_inner)) + {} + RouterTableEntry(RouterTableEntry&&) = delete; + RouterTableEntry(const RouterTableEntry&) = delete; + RouterTableEntry& operator=(const RouterTableEntry&) = delete; + RouterTableEntry& operator=(RouterTableEntry&&) = delete; + ~RouterTableEntry() override = default; + + void setDstNode(std::string const& _dstNode) override { m_inner()->dstNode = _dstNode; } + void setNextHop(std::string const& _nextHop) override { m_inner()->nextHop = _nextHop; } + void clearNextHop() override { m_inner()->nextHop = std::string(); } + void setDistance(int32_t _distance) override { m_inner()->distance = _distance; } + void incDistance(int32_t _deltaDistance) override { m_inner()->distance += _deltaDistance; } + + std::string const& dstNode() const override { return m_inner()->dstNode; } + std::string const& nextHop() const override { return m_inner()->nextHop; } + int32_t distance() const override { return m_inner()->distance; } + + ppctars::RouterTableEntry const& inner() const { return *(m_inner()); } + +private: + std::function m_inner; +}; + +class RouterTable : public RouterTableInterface +{ +public: + using Ptr = std::shared_ptr; + RouterTable() : m_inner([m_table = ppctars::RouterTable()]() mutable { return &m_table; }) {} + RouterTable(bcos::bytesConstRef _decodedData) : RouterTable() { decode(_decodedData); } + RouterTable(RouterTable&&) = delete; + RouterTable(const RouterTable&) = delete; + RouterTable& operator=(const RouterTable&) = delete; + RouterTable& operator=(RouterTable&&) = delete; + ~RouterTable() override = default; + + void encode(bcos::bytes& _encodedData) override; + void decode(bcos::bytesConstRef _decodedData) override; + + std::map const& routerEntries() override + { + return m_routerEntries; + } + // append the unreachableNodes into param _unreachableNodes + bool update(std::set& _unreachableNodes, std::string const& _generatedFrom, + RouterTableEntryInterface::Ptr _entry) override; + // append the unreachableNodes into param _unreachableNodes + bool erase(std::set& _unreachableNodes, std::string const& _p2pNodeID) override; + + void setNodeID(std::string const& _nodeID) override { m_nodeID = _nodeID; } + std::string const& nodeID() const override { return m_nodeID; } + + void setUnreachableDistance(int _unreachableDistance) override + { + m_unreachableDistance = _unreachableDistance; + } + + int unreachableDistance() const override { return m_unreachableDistance; } + std::string getNextHop(std::string const& _nodeID) override; + std::set getAllReachableNode() override; + + bool updateDstNodeEntry( + std::string const& _generatedFrom, RouterTableEntryInterface::Ptr _entry); + void updateDistanceForAllRouterEntries(std::set& _unreachableNodes, + std::string const& _nextHop, int32_t _newDistance); + +private: + std::string m_nodeID; + std::function m_inner; + std::map m_routerEntries; + mutable bcos::SharedMutex x_routerEntries; + + int m_unreachableDistance = 10; +}; + +class RouterTableFactoryImpl : public RouterTableFactory +{ +public: + using Ptr = std::shared_ptr; + RouterTableInterface::Ptr createRouterTable() override + { + return std::make_shared(); + } + RouterTableInterface::Ptr createRouterTable(bcos::bytesConstRef _decodedData) override + { + return std::make_shared(_decodedData); + } + + RouterTableEntryInterface::Ptr createRouterEntry() override + { + return std::make_shared(); + } +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableInterface.h b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableInterface.h new file mode 100644 index 00000000..e60d3096 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterTableInterface.h @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouterTableInterface.h + * @author: yujiechen + * @date 2022-5-24 + */ +#pragma once +#include +#include +#include +namespace ppc::gateway +{ +class RouterTableEntryInterface +{ +public: + using Ptr = std::shared_ptr; + RouterTableEntryInterface() = default; + RouterTableEntryInterface(const RouterTableEntryInterface&) = delete; + RouterTableEntryInterface(RouterTableEntryInterface&&) = delete; + RouterTableEntryInterface& operator=(RouterTableEntryInterface&&) = delete; + RouterTableEntryInterface& operator=(const RouterTableEntryInterface&) = delete; + virtual ~RouterTableEntryInterface() = default; + + virtual void setDstNode(std::string const& _dstNode) = 0; + virtual void setNextHop(std::string const& _nextHop) = 0; + virtual void clearNextHop() = 0; + virtual void setDistance(int32_t _distance) = 0; + virtual void incDistance(int32_t _deltaDistance) = 0; + + virtual std::string const& dstNode() const = 0; + virtual std::string const& nextHop() const = 0; + virtual int32_t distance() const = 0; +}; + +class RouterTableInterface +{ +public: + using Ptr = std::shared_ptr; + RouterTableInterface() = default; + RouterTableInterface(const RouterTableInterface&) = delete; + RouterTableInterface(RouterTableInterface&&) = delete; + RouterTableInterface& operator=(RouterTableInterface&&) = delete; + RouterTableInterface& operator=(const RouterTableInterface&) = delete; + virtual ~RouterTableInterface() = default; + + virtual bool update(std::set& _unreachableNodes, std::string const& _generatedFrom, + RouterTableEntryInterface::Ptr _entry) = 0; + virtual bool erase(std::set& _unreachableNodes, std::string const& _p2pNodeID) = 0; + + virtual std::map const& routerEntries() = 0; + + virtual void setNodeID(std::string const& _nodeID) = 0; + virtual std::string const& nodeID() const = 0; + virtual void setUnreachableDistance(int _unreachableDistance) = 0; + virtual int unreachableDistance() const = 0; + virtual std::string getNextHop(std::string const& _nodeID) = 0; + virtual std::set getAllReachableNode() = 0; + + virtual void encode(bcos::bytes& _encodedData) = 0; + virtual void decode(bcos::bytesConstRef _decodedData) = 0; +}; + +class RouterTableFactory +{ +public: + using Ptr = std::shared_ptr; + RouterTableFactory() = default; + RouterTableFactory(RouterTableFactory&&) = delete; + RouterTableFactory(const RouterTableFactory&) = delete; + RouterTableFactory& operator=(const RouterTableFactory&) = delete; + RouterTableFactory& operator=(RouterTableFactory&&) = delete; + virtual ~RouterTableFactory() = default; + + virtual RouterTableInterface::Ptr createRouterTable() = 0; + virtual RouterTableInterface::Ptr createRouterTable(bcos::bytesConstRef _decodedData) = 0; + virtual RouterTableEntryInterface::Ptr createRouterEntry() = 0; +}; + +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-protocol/src/JsonTaskImpl.h b/cpp/ppc-protocol/src/JsonTaskImpl.h index dea185b8..b0883b31 100644 --- a/cpp/ppc-protocol/src/JsonTaskImpl.h +++ b/cpp/ppc-protocol/src/JsonTaskImpl.h @@ -30,14 +30,19 @@ class JsonTaskImpl : public Task { public: using Ptr = std::shared_ptr; - JsonTaskImpl(std::string const& _selfPartyID, std::string const& _prePath = "data") : m_selfPartyID(_selfPartyID), m_prePath(_prePath) + JsonTaskImpl(std::string const& _selfPartyID, std::string const& _prePath = "data") + : m_selfPartyID(_selfPartyID), m_prePath(_prePath) {} - JsonTaskImpl(std::string const& _selfPartyID, std::string_view _taskData, std::string const& _prePath = "data") : JsonTaskImpl(_selfPartyID, _prePath) + JsonTaskImpl(std::string const& _selfPartyID, std::string_view _taskData, + std::string const& _prePath = "data") + : JsonTaskImpl(_selfPartyID, _prePath) { decode(_taskData); } - JsonTaskImpl(std::string const& _selfPartyID, Json::Value const& _taskJson, std::string const& _prePath = "data") : JsonTaskImpl(_selfPartyID, _prePath) + JsonTaskImpl(std::string const& _selfPartyID, Json::Value const& _taskJson, + std::string const& _prePath = "data") + : JsonTaskImpl(_selfPartyID, _prePath) { decodeJsonValue(_taskJson); } @@ -64,10 +69,7 @@ class JsonTaskImpl : public Task return m_peerParties; } - std::vector const& getReceiverLists() const override - { - return m_receiverLists; - } + std::vector const& getReceiverLists() const override { return m_receiverLists; } // params of the task, can be deserialized using json std::string const& param() const override { return m_param; } @@ -89,11 +91,8 @@ class JsonTaskImpl : public Task { m_syncResultToPeer = _syncResultToPeer; } - void setLowBandwidth(bool _lowBandwidth) override - { - m_lowBandwidth = _lowBandwidth; - } - + void setLowBandwidth(bool _lowBandwidth) override { m_lowBandwidth = _lowBandwidth; } + // decode the task void decode(std::string_view _taskData) override; virtual void decodeJsonValue(Json::Value const& root); diff --git a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp new file mode 100644 index 00000000..08ba9cdd --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageHeaderImpl.cpp + * @author: yujiechen + * @date 2024-08-23 + */ +#include "MessageHeaderImpl.h" +#include "ppc-framework/Common.h" +#include "ppc-utilities/Utilities.h" +#include + +using namespace ppc::protocol; +using namespace bcos; +using namespace ppc; + +void MessageOptionalHeaderImpl::encode(bcos::bytes& buffer) const +{ + // the componentType + uint16_t componentType = + boost::asio::detail::socket_ops::host_to_network_short(m_componentType); + buffer.insert(buffer.end(), (byte*)&componentType, (byte*)&componentType + 2); + // the source nodeID that send the message + uint16_t srcNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_srcNode.size()); + buffer.insert(buffer.end(), (byte*)&srcNodeLen, (byte*)&srcNodeLen + 2); + buffer.insert(buffer.end(), m_srcNode.begin(), m_srcNode.end()); + // the target nodeID that should receive the message + uint16_t dstNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstNode.size()); + buffer.insert(buffer.end(), (byte*)&dstNodeLen, (byte*)&dstNodeLen + 2); + buffer.insert(buffer.end(), m_dstNode.begin(), m_dstNode.end()); + bcos::bytes m_dstNode; + // the target agency that need receive the message + uint16_t dstInstLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstInst.size()); + buffer.insert(buffer.end(), (byte*)&dstInstLen, (byte*)&dstInstLen + 2); + buffer.insert(buffer.end(), m_dstInst.begin(), m_dstInst.end()); +} + + +int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t const _offset) +{ + auto offset = _offset; + CHECK_OFFSET_WITH_THROW_EXCEPTION(offset, data.size()); + // the componentType + auto pointer = data.data() + offset; + m_componentType = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // srcNode + offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), (pointer - data.data())); + // dstNode + offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset); + // dstInst + offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset); + return offset; +} + +void MessageHeaderImpl::encode(bcos::bytes& buffer) const +{ + buffer.clear(); + // the version, 2Bytes + uint16_t version = boost::asio::detail::socket_ops::host_to_network_short(m_version); + buffer.insert(buffer.end(), (byte*)&version, (byte*)&version + 2); + // the packetType, 2Bytes + uint16_t packetType = boost::asio::detail::socket_ops::host_to_network_short(m_packetType); + buffer.insert(buffer.end(), (byte*)&packetType, (byte*)&packetType + 2); + // the ttl, 2Bytes + uint16_t ttl = boost::asio::detail::socket_ops::host_to_network_short(m_ttl); + buffer.insert(buffer.end(), (byte*)&ttl, (byte*)&ttl + 2); + // the ext, 2Bytes + uint16_t ext = boost::asio::detail::socket_ops::host_to_network_short(m_ext); + buffer.insert(buffer.end(), (byte*)&ext, (byte*)&ext + 2); + // the traceID, 2+Bytes + uint16_t traceIDLen = boost::asio::detail::socket_ops::host_to_network_short(m_traceID.size()); + buffer.insert(buffer.end(), (byte*)&traceIDLen, (byte*)&traceIDLen + 2); + buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end()); + // srcGwNode, 2+Bytes + uint16_t srcGwNodeLen = + boost::asio::detail::socket_ops::host_to_network_short(m_srcGwNode.size()); + buffer.insert(buffer.end(), (byte*)&srcGwNodeLen, (byte*)&srcGwNodeLen + 2); + buffer.insert(buffer.end(), m_srcGwNode.begin(), m_srcGwNode.end()); + // dstGwNode, 2+Bytes + uint16_t dstGwNodeLen = + boost::asio::detail::socket_ops::host_to_network_short(m_dstGwNode.size()); + buffer.insert(buffer.end(), (byte*)&dstGwNodeLen, (byte*)&dstGwNodeLen + 2); + buffer.insert(buffer.end(), m_dstGwNode.begin(), m_dstGwNode.end()); + if (!hasOptionalField()) + { + return; + } + // encode the optionalField + m_optionalField->encode(buffer); + m_length = buffer.size(); +} + +int64_t MessageHeaderImpl::decode(bcos::bytesConstRef data) +{ + if (data.size() < MESSAGE_MIN_LENGTH) + { + BOOST_THROW_EXCEPTION( + WeDPRException() << errinfo_comment("Malform message for too small!")); + } + auto pointer = data.data(); + // the version + m_version = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // the pacektType + m_packetType = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // the ttl + m_ttl = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // the ext + m_ext = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // the traceID + bcos::bytes traceIDData; + auto offset = + decodeNetworkBuffer(traceIDData, data.data(), data.size(), (pointer - data.data())); + m_traceID = std::string(traceIDData.begin(), traceIDData.end()); + // srcGwNode + bcos::bytes srcGWNodeData; + offset = decodeNetworkBuffer(srcGWNodeData, data.data(), data.size(), offset); + m_srcGwNode = std::string(srcGWNodeData.begin(), srcGWNodeData.end()); + // dstGwNode + bcos::bytes dstGWNodeData; + offset = decodeNetworkBuffer(dstGWNodeData, data.data(), data.size(), offset); + m_dstGwNode = std::string(dstGWNodeData.begin(), dstGWNodeData.end()); + // optionalField + if (hasOptionalField()) + { + offset = m_optionalField->decode(data, offset); + } + m_length = offset; + return offset; +} diff --git a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h new file mode 100644 index 00000000..a8dda5a9 --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageHeaderImpl.h + * @author: yujiechen + * @date 2024-08-23 + */ +#pragma once +#include "ppc-framework/gateway/GatewayProtocol.h" +#include "ppc-framework/protocol/Message.h" + +namespace ppc::protocol +{ +class MessageOptionalHeaderImpl : public MessageOptionalHeader +{ +public: + using Ptr = std::shared_ptr; + MessageOptionalHeaderImpl() = default; + MessageOptionalHeaderImpl(bcos::bytesConstRef data, uint64_t const offset) + { + decode(data, offset); + } + + ~MessageOptionalHeaderImpl() override = default; + + void encode(bcos::bytes& buffer) const override; + int64_t decode(bcos::bytesConstRef data, uint64_t const offset) override; +}; + +class MessageHeaderImpl : public MessageHeader +{ +public: + using Ptr = std::shared_ptr; + MessageHeaderImpl() { m_optionalField = std::make_shared(); } + MessageHeaderImpl(bcos::bytesConstRef data) { decode(data); } + ~MessageHeaderImpl() override {} + + void encode(bcos::bytes& buffer) const override; + int64_t decode(bcos::bytesConstRef data) override; + + virtual bool hasOptionalField() const + { + return m_packetType == (uint16_t)ppc::gateway::GatewayPacketType::P2PMessage; + } + + bool isRespPacket() const override + { + return m_ext & (uint16_t)ppc::gateway::GatewayMsgExtFlag::Response; + } + void setRespPacket() override { m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::Response; } + +private: + // version(2) + packetType(2) + ttl(2) + ext(2) + traceIDLen(2) + srcGwNodeLen(2) + dstGwNode(2) + const size_t MESSAGE_MIN_LENGTH = 14; +}; + +class MessageHeaderBuilderImpl : public MessageHeaderBuilder +{ +public: + using Ptr = std::shared_ptr; + MessageHeaderBuilderImpl() = default; + ~MessageHeaderBuilderImpl() {} + + MessageHeader::Ptr build(bcos::bytesConstRef data) override + { + return std::make_shared(data); + } + MessageHeader::Ptr build() override { return std::make_shared(); } +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessageImpl.cpp b/cpp/ppc-protocol/src/v1/MessageImpl.cpp new file mode 100644 index 00000000..09f4183a --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessageImpl.cpp @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageImpl.cpp + * @author: yujiechen + * @date 2024-08-23 + */ + +#include "MessageImpl.h" + +using namespace bcos; +using namespace ppc::protocol; + +bool MessageImpl::encode(bcos::bytes& _buffer) +{ + // encode the header + bcos::bytes headerData; + m_header->encode(headerData); + // encode the payload + if (m_payload) + { + headerData.insert(headerData.end(), m_payload->begin(), m_payload->end()); + } +} + +bool MessageImpl::encode(bcos::boostssl::EncodedMsg& encodedMsg) +{ + // header + m_header->encode(encodedMsg.header); + // assign the payload back + encodedMsg.payload = m_payload; +} + +int64_t MessageImpl::decode(bytesConstRef buffer) +{ + if (buffer.size() > m_maxMessageLen) + { + BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment( + "Malform message for over the size limit, max allowed size is: " + + std::to_string(m_maxMessageLen))); + } + // decode the header + m_header = m_headerBuilder->build(buffer); + // decode the payload + if (!m_payload) + { + m_payload = std::make_shared(); + } + m_payload->clear(); + m_payload->insert(m_payload->end(), buffer.data() + m_header->length(), buffer.end()); +} \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessageImpl.h b/cpp/ppc-protocol/src/v1/MessageImpl.h new file mode 100644 index 00000000..208db1b3 --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessageImpl.h @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageImpl.h + * @author: yujiechen + * @date 2024-08-23 + */ +#pragma once +#include "ppc-framework/Common.h" +#include "ppc-framework/protocol/Message.h" + +namespace ppc::protocol +{ +class MessageImpl : public Message +{ +public: + using Ptr = std::shared_ptr; + MessageImpl(MessageHeaderBuilder::Ptr headerBuilder, size_t maxMessageLen) + : m_headerBuilder(std::move(headerBuilder)), m_maxMessageLen(maxMessageLen) + {} + MessageImpl( + MessageHeaderBuilder::Ptr headerBuilder, size_t maxMessageLen, bcos::bytesConstRef buffer) + : MessageImpl(headerBuilder, maxMessageLen) + { + decode(buffer); + } + + ~MessageImpl() override = default; + + bool encode(bcos::bytes& _buffer) override; + // encode and return the {header, payload} + bool encode(bcos::boostssl::EncodedMsg& _encodedMsg) override; + int64_t decode(bcos::bytesConstRef _buffer) override; + +private: + MessageHeaderBuilder::Ptr m_headerBuilder; + + // default max message length is 100MB + size_t m_maxMessageLen = 100 * 1024 * 1024; +}; + +class MessageBuilderImpl : public MessageBuilder +{ +public: + using Ptr = std::shared_ptr; + MessageBuilderImpl(MessageHeaderBuilder::Ptr msgHeaderBuilder) + : m_msgHeaderBuilder(std::move(msgHeaderBuilder)) + {} + + MessageBuilderImpl(MessageHeaderBuilder::Ptr msgHeaderBuilder, size_t maxMessageLen) + : MessageBuilderImpl(std::move(msgHeaderBuilder)) + { + m_maxMessageLen = maxMessageLen; + } + + ~MessageBuilderImpl() override {} + + Message::Ptr build() override + { + return std::make_shared(m_msgHeaderBuilder, m_maxMessageLen); + } + Message::Ptr build(bcos::bytesConstRef buffer) override + { + return std::make_shared(m_msgHeaderBuilder, m_maxMessageLen, buffer); + } + +private: + MessageHeaderBuilder::Ptr m_msgHeaderBuilder; + // default max message length is 100MB + size_t m_maxMessageLen = 100 * 1024 * 1024; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp new file mode 100644 index 00000000..59397edf --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessagePayloadImpl.h + * @author: yujiechen + * @date 2024-08-22 + */ + +#include "MessagePayloadImpl.h" + +#include "ppc-utilities/Utilities.h" +#include + +using namespace ppc::protocol; +using namespace bcos; + +int64_t MessagePayloadImpl::encode(bcos::bytes& buffer) const +{ + // version + uint16_t version = boost::asio::detail::socket_ops::host_to_network_short(m_version); + buffer.insert(buffer.end(), (byte*)&version, (byte*)&version + 2); + // topic + uint16_t topicLen = boost::asio::detail::socket_ops::host_to_network_short(m_topic.size()); + buffer.insert(buffer.end(), (byte*)&topicLen, (byte*)&topicLen + 2); + buffer.insert(buffer.end(), m_topic.begin(), m_topic.end()); + // data + uint16_t dataLen = boost::asio::detail::socket_ops::host_to_network_short(m_data.size()); + buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 2); + buffer.insert(buffer.end(), m_data.begin(), m_data.end()); + // update the length + m_length = buffer.size(); + return m_length; +} + +int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer) +{ + // check the message + if (buffer.size() < MIN_PAYLOAD_LEN) + { + BOOST_THROW_EXCEPTION( + WeDPRException() << errinfo_comment("Malform payload for too small!")); + } + auto pointer = buffer.data(); + // the version + m_version = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; + // topic + bcos::bytes topicData; + auto offset = + decodeNetworkBuffer(topicData, buffer.data(), buffer.size(), (pointer - buffer.data())); + m_topic = std::string(topicData.begin(), topicData.end()); + // data + offset = decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset); + return offset; +} \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h new file mode 100644 index 00000000..f7f5a3e8 --- /dev/null +++ b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessagePayloadImpl.h + * @author: yujiechen + * @date 2024-08-22 + */ +#pragma once +#include "ppc-framework/Common.h" +#include "ppc-framework/protocol/MessagePayload.h" + +namespace ppc::protocol +{ +class MessagePayloadImpl : public MessagePayload +{ +public: + using Ptr = std::shared_ptr; + MessagePayloadImpl() = default; + MessagePayloadImpl(bcos::bytesConstRef buffer) { decode(buffer); } + ~MessagePayloadImpl() override {} + + int64_t encode(bcos::bytes& buffer) const override; + int64_t decode(bcos::bytesConstRef data) override; + +private: + const unsigned int MIN_PAYLOAD_LEN = 6; +}; + + +class MessagePayloadBuilderImpl : public MessagePayloadBuilder +{ +public: + using Ptr = std::shared_ptr; + MessagePayloadBuilderImpl() = default; + ~MessagePayloadBuilderImpl() override {} + MessagePayload::Ptr build() override { return std::make_shared(); } + MessagePayload::Ptr build(bcos::bytesConstRef buffer) override + { + return std::make_shared(buffer); + } +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/RouterTable.tars b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/RouterTable.tars new file mode 100644 index 00000000..48096cdd --- /dev/null +++ b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/RouterTable.tars @@ -0,0 +1,13 @@ +module ppctars +{ +struct RouterTableEntry +{ + 1 require string dstNode; + 2 optional string nextHop; + 3 require int distance; +}; +struct RouterTable +{ + 1 optional vector routerEntries; +}; +}; \ No newline at end of file diff --git a/cpp/ppc-tools/src/codec/CodecUtility.h b/cpp/ppc-tools/src/codec/CodecUtility.h index ce7a208a..d8f83201 100644 --- a/cpp/ppc-tools/src/codec/CodecUtility.h +++ b/cpp/ppc-tools/src/codec/CodecUtility.h @@ -20,7 +20,6 @@ #pragma once #include "openssl/bn.h" #include "ppc-framework/libwrapper/BigNum.h" -#include "ppc-utilities/Utilities.h" #include #include #include diff --git a/cpp/ppc-utilities/Utilities.h b/cpp/ppc-utilities/Utilities.h index b9b8e0ae..4b04ea21 100644 --- a/cpp/ppc-utilities/Utilities.h +++ b/cpp/ppc-utilities/Utilities.h @@ -20,20 +20,22 @@ #pragma once #include "ppc-framework/Common.h" +#include namespace ppc { inline uint64_t decodeNetworkBuffer( bcos::bytes& _result, bcos::byte const* buffer, unsigned int bufferLen, uint64_t const offset) { - CHECK_OFFSET_WITH_THROW_EXCEPTION(offset, bufferLen); + uint64_t curOffset = offset; + CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); auto dataLen = - boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)buffer + offset)); - offset += 2; - CHECK_OFFSET_WITH_THROW_EXCEPTION(offset, bufferLen); - buffer.insert( - buffer.end(), (bcos::byte*)_buffer + offset, (bcos::byte*)_buffer + offset + dataLen); - offset += dataLen; - return offset; + boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)buffer + curOffset)); + curOffset += 2; + CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen); + _result.insert( + _result.end(), (bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen); + curOffset += dataLen; + return curOffset; } } // namespace ppc \ No newline at end of file diff --git a/cpp/vcpkg-configuration.json b/cpp/vcpkg-configuration.json index 5913a7c1..a6334850 100644 --- a/cpp/vcpkg-configuration.json +++ b/cpp/vcpkg-configuration.json @@ -2,8 +2,8 @@ "registries": [ { "kind": "git", - "repository": "https://github.com/FISCO-BCOS/registry", - "baseline": "070f336149afdac5cc9ace97df01de7ee31aab30", + "repository": "https://github.com/cyjseagull/registry", + "baseline": "6160f4167d4ab801f16a18bdd842a75edf7b069e", "packages": [ "openssl", "bcos-utilities",