diff --git a/cpp/ppc-framework/gateway/GatewayProtocol.h b/cpp/ppc-framework/gateway/GatewayProtocol.h index b91137d6..ebd9b7a3 100644 --- a/cpp/ppc-framework/gateway/GatewayProtocol.h +++ b/cpp/ppc-framework/gateway/GatewayProtocol.h @@ -28,7 +28,10 @@ enum class GatewayPacketType : uint16_t BroadcastMessage = 0x01, RouterTableSyncSeq = 0x10, RouterTableResponse = 0x11, - RouterTableRequest = 0x12 + RouterTableRequest = 0x12, + SyncNodeSeq = 0x20, + RequestNodeStatus = 0x21, + ResponseNodeStatus = 0x22, }; enum class GatewayMsgExtFlag : uint16_t diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index 7b4240c9..6707fae9 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -51,6 +51,8 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service, m_service->registerMsgHandler((uint16_t)GatewayPacketType::BroadcastMessage, boost::bind(&GatewayImpl::onReceiveBroadcastMessage, this, boost::placeholders::_1, boost::placeholders::_2)); + m_gatewayRouterManager = std::make_shared( + m_service, m_gatewayInfoFactory, m_localRouter, m_peerRouter); } void GatewayImpl::start() @@ -63,6 +65,7 @@ void GatewayImpl::start() m_running = true; m_service->start(); m_p2pRouterManager->start(); + m_gatewayRouterManager->start(); GATEWAY_LOG(INFO) << LOG_DESC("Start gateway success"); } @@ -76,6 +79,7 @@ void GatewayImpl::stop() m_running = false; m_service->stop(); m_p2pRouterManager->stop(); + m_gatewayRouterManager->stop(); GATEWAY_LOG(INFO) << LOG_DESC("Stop gateway success"); } diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index b71bd5b1..9d2ced23 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -22,6 +22,7 @@ #include "ppc-gateway/gateway/router/GatewayNodeInfo.h" #include "ppc-gateway/p2p/Service.h" #include "ppc-gateway/p2p/router/RouterManager.h" +#include "router/GatewayRouterManager.h" #include "router/LocalRouter.h" #include "router/PeerRouterTable.h" @@ -80,6 +81,7 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this #include +#include namespace ppc::gateway { class GatewayNodeInfo @@ -33,6 +35,9 @@ class GatewayNodeInfo virtual std::string const& p2pNodeID() const = 0; // the agency virtual std::string const& agency() const = 0; + virtual uint32_t statusSeq() const = 0; + virtual void setStatusSeq(uint32_t statusSeq) = 0; + // get the node information by nodeID virtual ppc::protocol::INodeInfo::Ptr nodeInfo(bcos::bytes const& nodeID) const = 0; virtual bool tryAddNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; @@ -51,6 +56,7 @@ class GatewayNodeInfo virtual void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) = 0; virtual std::map nodeList() const = 0; + virtual uint16_t nodeSize() const = 0; }; class GatewayNodeInfoFactory @@ -71,4 +77,13 @@ struct GatewayNodeInfoCmp } }; using GatewayNodeInfos = std::set; + +inline std::string printNodeStatus(GatewayNodeInfo::Ptr const& status) +{ + std::ostringstream stringstream; + stringstream << LOG_KV("p2pNodeID", status->p2pNodeID()) << LOG_KV("agency", status->agency()) + << LOG_KV("statusSeq", status->statusSeq()) + << LOG_KV("nodeSize", status->nodeSize()); + return stringstream.str(); +} } // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp index f7c9cefe..3c718fe9 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -36,6 +36,16 @@ std::string const& GatewayNodeInfoImpl::agency() const { return m_inner()->agency; } + +uint32_t GatewayNodeInfoImpl::statusSeq() const +{ + return m_inner()->statusSeq; +} +void GatewayNodeInfoImpl::setStatusSeq(uint32_t statusSeq) +{ + m_inner()->statusSeq = statusSeq; +} + // get the node information by nodeID INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const { diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h index cb0dfc10..72c6d440 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -66,6 +66,10 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo bcos::WriteGuard l(x_nodeList); return m_nodeList; } + uint32_t statusSeq() const override; + void setStatusSeq(uint32_t statusSeq) override; + + virtual uint16_t nodeSize() const override { return m_nodeList.size(); } private: std::function m_inner; diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp new file mode 100644 index 00000000..a26844e4 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp @@ -0,0 +1,183 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayRouterManager.h + * @author: yujiechen + * @date 2024-08-26 + */ +#include "GatewayRouterManager.h" +#include "ppc-framework/gateway/GatewayProtocol.h" +#include + +using namespace ppc::protocol; +using namespace ppc; +using namespace bcos; +using namespace ppc::gateway; +using namespace bcos::boostssl; +using namespace bcos::boostssl::ws; + +GatewayRouterManager::GatewayRouterManager(Service::Ptr service, + GatewayNodeInfoFactory::Ptr nodeStatusFactory, LocalRouter::Ptr localRouter, + PeerRouterTable::Ptr peerRouter) + : m_service(std::move(service)), + m_nodeStatusFactory(std::move(nodeStatusFactory)), + m_localRouter(std::move(localRouter)), + m_peerRouter(std::move(peerRouter)) +{ + m_service->registerMsgHandler((uint16_t)GatewayPacketType::SyncNodeSeq, + boost::bind(&GatewayRouterManager::onReceiveNodeSeqMessage, this, boost::placeholders::_1, + boost::placeholders::_2)); + + m_service->registerMsgHandler((uint16_t)GatewayPacketType::RequestNodeStatus, + boost::bind(&GatewayRouterManager::onReceiveRequestNodeStatusMsg, this, + boost::placeholders::_1, boost::placeholders::_2)); + + m_service->registerMsgHandler((uint16_t)GatewayPacketType::ResponseNodeStatus, + boost::bind(&GatewayRouterManager::onRecvResponseNodeStatusMsg, this, + boost::placeholders::_1, boost::placeholders::_2)); + + m_timer = std::make_shared(SEQ_SYNC_PERIOD, "seqSync"); + // broadcast seq periodically + m_timer->registerTimeoutHandler([this]() { broadcastStatusSeq(); }); +} + + +void GatewayRouterManager::start() +{ + if (m_running) + { + GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been started"); + return; + } + m_running = true; + m_timer->start(); + GATEWAY_LOG(INFO) << LOG_DESC("start GatewayRouterManager success"); +} + +void GatewayRouterManager::stop() +{ + if (!m_running) + { + GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been stopped"); + return; + } + m_running = false; + m_timer->stop(); + GATEWAY_LOG(INFO) << LOG_DESC("stop GatewayRouterManager success"); +} + +void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSession::Ptr session) +{ + auto statusSeq = + boost::asio::detail::socket_ops::network_to_host_long(*((uint32_t*)msg->payload()->data())); + + auto p2pMessage = std::dynamic_pointer_cast(msg); + auto const& from = (p2pMessage->header()->srcGwNode().size() > 0) ? + p2pMessage->header()->srcGwNode() : + session->nodeId(); + auto statusSeqChanged = statusChanged(from, statusSeq); + if (!statusSeqChanged) + { + return; + } + // status changed, request for the nodeStatus + GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") << LOG_KV("from", from) + << LOG_KV("statusSeq", statusSeq); + m_service->asyncSendMessageByP2PNodeID( + (uint16_t)GatewayPacketType::RequestNodeStatus, from, std::make_shared()); +} + + +bool GatewayRouterManager::statusChanged(std::string const& p2pNodeID, uint32_t seq) +{ + bool ret = true; + ReadGuard l(x_p2pID2Seq); + auto it = m_p2pID2Seq.find(p2pNodeID); + if (it != m_p2pID2Seq.end()) + { + ret = (seq > it->second); + } + return ret; +} + +void GatewayRouterManager::broadcastStatusSeq() +{ + m_timer->restart(); + auto message = std::dynamic_pointer_cast(m_service->messageFactory()->buildMessage()); + message->setPacketType((uint16_t)GatewayPacketType::SyncNodeSeq); + auto seq = m_localRouter->statusSeq(); + auto statusSeq = boost::asio::detail::socket_ops::host_to_network_long(seq); + auto payload = std::make_shared((byte*)&statusSeq, (byte*)&statusSeq + 4); + message->setPayload(payload); + GATEWAY_LOG(TRACE) << LOG_DESC("broadcastStatusSeq") << LOG_KV("seq", seq); + m_service->asyncBroadcastMessage(message); +} + + +void GatewayRouterManager::onReceiveRequestNodeStatusMsg( + MessageFace::Ptr msg, WsSession::Ptr session) +{ + auto p2pMessage = std::dynamic_pointer_cast(msg); + auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ? + p2pMessage->header()->srcGwNode() : + session->nodeId(); + + auto nodeStatusData = m_localRouter->generateNodeStatus(); + if (!nodeStatusData) + { + GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveRequestNodeStatusMsg: generate nodeInfo error") + << LOG_KV("from", from); + return; + } + GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveRequestNodeStatusMsg: response the latest nodeStatus") + << LOG_KV("from", from); + m_service->asyncSendMessageByP2PNodeID( + (uint16_t)GatewayPacketType::ResponseNodeStatus, from, nodeStatusData); +} + +void GatewayRouterManager::onRecvResponseNodeStatusMsg(MessageFace::Ptr msg, WsSession::Ptr session) +{ + auto nodeStatus = m_nodeStatusFactory->build(); + nodeStatus->decode(bytesConstRef(msg->payload()->data(), msg->payload()->size())); + + auto p2pMessage = std::dynamic_pointer_cast(msg); + auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ? + p2pMessage->header()->srcGwNode() : + session->nodeId(); + + GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") << LOG_KV("from", from) + << LOG_KV("statusSeq", nodeStatus->statusSeq()) + << LOG_KV("agency", nodeStatus->agency()); + updatePeerNodeStatus(from, nodeStatus); +} + +void GatewayRouterManager::updatePeerNodeStatus( + std::string const& p2pID, GatewayNodeInfo::Ptr status) +{ + auto statusSeq = status->statusSeq(); + { + UpgradableGuard l(x_p2pID2Seq); + if (m_p2pID2Seq.contains(p2pID) && (m_p2pID2Seq.at(p2pID) >= statusSeq)) + { + return; + } + UpgradeGuard ul(l); + m_p2pID2Seq[p2pID] = statusSeq; + } + GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") << LOG_KV("from", p2pID) + << LOG_KV("statusSeq", status->statusSeq()) + << LOG_KV("agency", status->agency()); + m_peerRouter->updateGatewayInfo(status); +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h new file mode 100644 index 00000000..4cf9b0ba --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayRouterManager.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-gateway/gateway/router/GatewayNodeInfo.h" +#include "ppc-gateway/gateway/router/LocalRouter.h" +#include "ppc-gateway/gateway/router/PeerRouterTable.h" +#include "ppc-gateway/p2p/Service.h" +#include +#include + +namespace ppc::gateway +{ +class GatewayRouterManager +{ +public: + using Ptr = std::shared_ptr; + GatewayRouterManager(Service::Ptr service, GatewayNodeInfoFactory::Ptr nodeStatusFactory, + LocalRouter::Ptr localRouter, PeerRouterTable::Ptr peerRouter); + virtual void start(); + virtual void stop(); + +protected: + virtual void onReceiveNodeSeqMessage( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + + virtual void onReceiveRequestNodeStatusMsg( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + + virtual void onRecvResponseNodeStatusMsg( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + bool statusChanged(std::string const& p2pNodeID, uint32_t seq); + void broadcastStatusSeq(); + + void updatePeerNodeStatus(std::string const& p2pID, GatewayNodeInfo::Ptr status); + +private: + Service::Ptr m_service; + GatewayNodeInfoFactory::Ptr m_nodeStatusFactory; + std::shared_ptr m_timer; + + LocalRouter::Ptr m_localRouter; + PeerRouterTable::Ptr m_peerRouter; + + bool m_running = false; + + // P2pID => statusSeq + std::map m_p2pID2Seq; + mutable bcos::SharedMutex x_p2pID2Seq; + + // TODO: make this configurable + unsigned const SEQ_SYNC_PERIOD = 3000; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp index 9df4e45f..7d934daa 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -25,6 +25,7 @@ using namespace bcos; using namespace ppc::protocol; using namespace ppc::gateway; +// Note: the change of the topic will not trigger router-update void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& topic) { m_routerInfo->registerTopic(_nodeID.toBytes(), topic); @@ -48,6 +49,7 @@ void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& } } +// Note: the change of the topic will not trigger router-update void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const& topic) { m_routerInfo->unRegisterTopic(_nodeID.toBytes(), topic); diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h index 194ef6ab..8b931c9c 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -42,10 +42,19 @@ class LocalRouter virtual bool registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) { nodeInfo->setFront(m_frontBuilder->buildClient(nodeInfo->endPoint())); - return m_routerInfo->tryAddNodeInfo(nodeInfo); + auto ret = m_routerInfo->tryAddNodeInfo(nodeInfo); + if (ret) + { + increaseSeq(); + } + return ret; } - virtual void unRegisterNode(bcos::bytes const& nodeID) { m_routerInfo->removeNodeInfo(nodeID); } + virtual void unRegisterNode(bcos::bytes const& nodeID) + { + m_routerInfo->removeNodeInfo(nodeID); + increaseSeq(); + } virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic); virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic); @@ -57,10 +66,28 @@ class LocalRouter virtual bool dispatcherMessage(ppc::protocol::Message::Ptr const& msg, ppc::protocol::ReceiveMsgFunc callback, bool holding = true); + std::shared_ptr generateNodeStatus() + { + auto data = std::make_shared(); + m_routerInfo->encode(*data); + return data; + } + uint32_t statusSeq() { return m_statusSeq; } + +private: + uint32_t increaseSeq() + { + uint32_t statusSeq = ++m_statusSeq; + return statusSeq; + } + + private: ppc::front::IFrontBuilder::Ptr m_frontBuilder; GatewayNodeInfo::Ptr m_routerInfo; + std::atomic m_statusSeq{1}; + // NodeID=>topics using Topics = std::set; std::map m_topicInfo; diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp index a2bccd0a..e5367eda 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -33,8 +33,16 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) for (auto const& it : nodeList) { // update nodeID => gatewayInfos + if (!m_nodeID2GatewayInfos.count(it.first)) + { + m_nodeID2GatewayInfos.insert(std::make_pair(it.first, GatewayNodeInfos())); + } m_nodeID2GatewayInfos[it.first].insert(gatewayInfo); } + if (!m_agency2GatewayInfos.count(gatewayInfo->agency())) + { + m_agency2GatewayInfos.insert(std::make_pair(gatewayInfo->agency(), GatewayNodeInfos())); + } // update agency => gatewayInfos m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo); } diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h index a1701c9e..64ebf4c8 100644 --- a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h @@ -61,7 +61,6 @@ class RouterManager // for message forward Service::Ptr m_service; std::shared_ptr m_routerTimer; - std::atomic m_statusSeq{1}; // called when the given node unreachable std::vector> m_unreachableHandlers; @@ -69,5 +68,7 @@ class RouterManager std::map m_node2Seq; mutable bcos::SharedMutex x_node2Seq; + + std::atomic m_statusSeq{1}; }; } // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars index 9a0c67ec..06055197 100644 --- a/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars +++ b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars @@ -11,5 +11,6 @@ module ppctars 1 require string p2pNodeID; 2 require string agency; 3 optional vector nodeList; + 4 optional int statusSeq; }; };