Skip to content

Commit

Permalink
add gateway-router-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Aug 27, 2024
1 parent 6dcefa0 commit 0c03b81
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cpp/ppc-framework/gateway/GatewayProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ enum class GatewayPacketType : uint16_t
BroadcastMessage = 0x01,
RouterTableSyncSeq = 0x10,
RouterTableResponse = 0x11,
RouterTableRequest = 0x12
RouterTableRequest = 0x12,
SyncNodeSeq = 0x20,
RequestNodeStatus = 0x21,
ResponseNodeStatus = 0x22,
};

enum class GatewayMsgExtFlag : uint16_t
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service,
m_service->registerMsgHandler((uint16_t)GatewayPacketType::BroadcastMessage,
boost::bind(&GatewayImpl::onReceiveBroadcastMessage, this, boost::placeholders::_1,
boost::placeholders::_2));
m_gatewayRouterManager = std::make_shared<GatewayRouterManager>(
m_service, m_gatewayInfoFactory, m_localRouter, m_peerRouter);
}

void GatewayImpl::start()
Expand All @@ -63,6 +65,7 @@ void GatewayImpl::start()
m_running = true;
m_service->start();
m_p2pRouterManager->start();
m_gatewayRouterManager->start();
GATEWAY_LOG(INFO) << LOG_DESC("Start gateway success");
}

Expand All @@ -76,6 +79,7 @@ void GatewayImpl::stop()
m_running = false;
m_service->stop();
m_p2pRouterManager->stop();
m_gatewayRouterManager->stop();
GATEWAY_LOG(INFO) << LOG_DESC("Stop gateway success");
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ppc-gateway/gateway/router/GatewayNodeInfo.h"
#include "ppc-gateway/p2p/Service.h"
#include "ppc-gateway/p2p/router/RouterManager.h"
#include "router/GatewayRouterManager.h"
#include "router/LocalRouter.h"
#include "router/PeerRouterTable.h"

Expand Down Expand Up @@ -80,6 +81,7 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this<Gateway
std::string m_agency;

RouterManager::Ptr m_p2pRouterManager;
GatewayRouterManager::Ptr m_gatewayRouterManager;

GatewayNodeInfoFactory::Ptr m_gatewayInfoFactory;
LocalRouter::Ptr m_localRouter;
Expand Down
15 changes: 15 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
*/
#pragma once
#include "ppc-framework/protocol/INodeInfo.h"
#include <bcos-utilities/Log.h>
#include <memory>
#include <sstream>
namespace ppc::gateway
{
class GatewayNodeInfo
Expand All @@ -33,6 +35,9 @@ class GatewayNodeInfo
virtual std::string const& p2pNodeID() const = 0;
// the agency
virtual std::string const& agency() const = 0;
virtual uint32_t statusSeq() const = 0;
virtual void setStatusSeq(uint32_t statusSeq) = 0;

// get the node information by nodeID
virtual ppc::protocol::INodeInfo::Ptr nodeInfo(bcos::bytes const& nodeID) const = 0;
virtual bool tryAddNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
Expand All @@ -51,6 +56,7 @@ class GatewayNodeInfo
virtual void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) = 0;

virtual std::map<bcos::bytes, ppc::protocol::INodeInfo::Ptr> nodeList() const = 0;
virtual uint16_t nodeSize() const = 0;
};

class GatewayNodeInfoFactory
Expand All @@ -71,4 +77,13 @@ struct GatewayNodeInfoCmp
}
};
using GatewayNodeInfos = std::set<GatewayNodeInfo::Ptr, GatewayNodeInfoCmp>;

inline std::string printNodeStatus(GatewayNodeInfo::Ptr const& status)
{
std::ostringstream stringstream;
stringstream << LOG_KV("p2pNodeID", status->p2pNodeID()) << LOG_KV("agency", status->agency())
<< LOG_KV("statusSeq", status->statusSeq())
<< LOG_KV("nodeSize", status->nodeSize());
return stringstream.str();
}
} // namespace ppc::gateway
10 changes: 10 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ std::string const& GatewayNodeInfoImpl::agency() const
{
return m_inner()->agency;
}

uint32_t GatewayNodeInfoImpl::statusSeq() const
{
return m_inner()->statusSeq;
}
void GatewayNodeInfoImpl::setStatusSeq(uint32_t statusSeq)
{
m_inner()->statusSeq = statusSeq;
}

// get the node information by nodeID
INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo
bcos::WriteGuard l(x_nodeList);
return m_nodeList;
}
uint32_t statusSeq() const override;
void setStatusSeq(uint32_t statusSeq) override;

virtual uint16_t nodeSize() const override { return m_nodeList.size(); }

private:
std::function<ppctars::GatewayNodeInfo*()> m_inner;
Expand Down
183 changes: 183 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayRouterManager.h
* @author: yujiechen
* @date 2024-08-26
*/
#include "GatewayRouterManager.h"
#include "ppc-framework/gateway/GatewayProtocol.h"
#include <ppc-gateway/Common.h>

using namespace ppc::protocol;
using namespace ppc;
using namespace bcos;
using namespace ppc::gateway;
using namespace bcos::boostssl;
using namespace bcos::boostssl::ws;

GatewayRouterManager::GatewayRouterManager(Service::Ptr service,
GatewayNodeInfoFactory::Ptr nodeStatusFactory, LocalRouter::Ptr localRouter,
PeerRouterTable::Ptr peerRouter)
: m_service(std::move(service)),
m_nodeStatusFactory(std::move(nodeStatusFactory)),
m_localRouter(std::move(localRouter)),
m_peerRouter(std::move(peerRouter))
{
m_service->registerMsgHandler((uint16_t)GatewayPacketType::SyncNodeSeq,
boost::bind(&GatewayRouterManager::onReceiveNodeSeqMessage, this, boost::placeholders::_1,
boost::placeholders::_2));

m_service->registerMsgHandler((uint16_t)GatewayPacketType::RequestNodeStatus,
boost::bind(&GatewayRouterManager::onReceiveRequestNodeStatusMsg, this,
boost::placeholders::_1, boost::placeholders::_2));

m_service->registerMsgHandler((uint16_t)GatewayPacketType::ResponseNodeStatus,
boost::bind(&GatewayRouterManager::onRecvResponseNodeStatusMsg, this,
boost::placeholders::_1, boost::placeholders::_2));

m_timer = std::make_shared<Timer>(SEQ_SYNC_PERIOD, "seqSync");
// broadcast seq periodically
m_timer->registerTimeoutHandler([this]() { broadcastStatusSeq(); });
}


void GatewayRouterManager::start()
{
if (m_running)
{
GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been started");
return;
}
m_running = true;
m_timer->start();
GATEWAY_LOG(INFO) << LOG_DESC("start GatewayRouterManager success");
}

void GatewayRouterManager::stop()
{
if (!m_running)
{
GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been stopped");
return;
}
m_running = false;
m_timer->stop();
GATEWAY_LOG(INFO) << LOG_DESC("stop GatewayRouterManager success");
}

void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSession::Ptr session)
{
auto statusSeq =
boost::asio::detail::socket_ops::network_to_host_long(*((uint32_t*)msg->payload()->data()));

auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (p2pMessage->header()->srcGwNode().size() > 0) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();
auto statusSeqChanged = statusChanged(from, statusSeq);
if (!statusSeqChanged)
{
return;
}
// status changed, request for the nodeStatus
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") << LOG_KV("from", from)
<< LOG_KV("statusSeq", statusSeq);
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::RequestNodeStatus, from, std::make_shared<bcos::bytes>());
}


bool GatewayRouterManager::statusChanged(std::string const& p2pNodeID, uint32_t seq)
{
bool ret = true;
ReadGuard l(x_p2pID2Seq);
auto it = m_p2pID2Seq.find(p2pNodeID);
if (it != m_p2pID2Seq.end())
{
ret = (seq > it->second);
}
return ret;
}

void GatewayRouterManager::broadcastStatusSeq()
{
m_timer->restart();
auto message = std::dynamic_pointer_cast<Message>(m_service->messageFactory()->buildMessage());
message->setPacketType((uint16_t)GatewayPacketType::SyncNodeSeq);
auto seq = m_localRouter->statusSeq();
auto statusSeq = boost::asio::detail::socket_ops::host_to_network_long(seq);
auto payload = std::make_shared<bytes>((byte*)&statusSeq, (byte*)&statusSeq + 4);
message->setPayload(payload);
GATEWAY_LOG(TRACE) << LOG_DESC("broadcastStatusSeq") << LOG_KV("seq", seq);
m_service->asyncBroadcastMessage(message);
}


void GatewayRouterManager::onReceiveRequestNodeStatusMsg(
MessageFace::Ptr msg, WsSession::Ptr session)
{
auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();

auto nodeStatusData = m_localRouter->generateNodeStatus();
if (!nodeStatusData)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveRequestNodeStatusMsg: generate nodeInfo error")
<< LOG_KV("from", from);
return;
}
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveRequestNodeStatusMsg: response the latest nodeStatus")
<< LOG_KV("from", from);
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::ResponseNodeStatus, from, nodeStatusData);
}

void GatewayRouterManager::onRecvResponseNodeStatusMsg(MessageFace::Ptr msg, WsSession::Ptr session)
{
auto nodeStatus = m_nodeStatusFactory->build();
nodeStatus->decode(bytesConstRef(msg->payload()->data(), msg->payload()->size()));

auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();

GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") << LOG_KV("from", from)
<< LOG_KV("statusSeq", nodeStatus->statusSeq())
<< LOG_KV("agency", nodeStatus->agency());
updatePeerNodeStatus(from, nodeStatus);
}

void GatewayRouterManager::updatePeerNodeStatus(
std::string const& p2pID, GatewayNodeInfo::Ptr status)
{
auto statusSeq = status->statusSeq();
{
UpgradableGuard l(x_p2pID2Seq);
if (m_p2pID2Seq.contains(p2pID) && (m_p2pID2Seq.at(p2pID) >= statusSeq))
{
return;
}
UpgradeGuard ul(l);
m_p2pID2Seq[p2pID] = statusSeq;
}
GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") << LOG_KV("from", p2pID)
<< LOG_KV("statusSeq", status->statusSeq())
<< LOG_KV("agency", status->agency());
m_peerRouter->updateGatewayInfo(status);
}
70 changes: 70 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayRouterManager.h
* @author: yujiechen
* @date 2024-08-26
*/
#pragma once
#include "ppc-gateway/gateway/router/GatewayNodeInfo.h"
#include "ppc-gateway/gateway/router/LocalRouter.h"
#include "ppc-gateway/gateway/router/PeerRouterTable.h"
#include "ppc-gateway/p2p/Service.h"
#include <bcos-utilities/Timer.h>
#include <memory>

namespace ppc::gateway
{
class GatewayRouterManager
{
public:
using Ptr = std::shared_ptr<GatewayRouterManager>;
GatewayRouterManager(Service::Ptr service, GatewayNodeInfoFactory::Ptr nodeStatusFactory,
LocalRouter::Ptr localRouter, PeerRouterTable::Ptr peerRouter);
virtual void start();
virtual void stop();

protected:
virtual void onReceiveNodeSeqMessage(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);

virtual void onReceiveRequestNodeStatusMsg(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);

virtual void onRecvResponseNodeStatusMsg(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);
bool statusChanged(std::string const& p2pNodeID, uint32_t seq);
void broadcastStatusSeq();

void updatePeerNodeStatus(std::string const& p2pID, GatewayNodeInfo::Ptr status);

private:
Service::Ptr m_service;
GatewayNodeInfoFactory::Ptr m_nodeStatusFactory;
std::shared_ptr<bcos::Timer> m_timer;

LocalRouter::Ptr m_localRouter;
PeerRouterTable::Ptr m_peerRouter;

bool m_running = false;

// P2pID => statusSeq
std::map<std::string, uint32_t> m_p2pID2Seq;
mutable bcos::SharedMutex x_p2pID2Seq;

// TODO: make this configurable
unsigned const SEQ_SYNC_PERIOD = 3000;
};
} // namespace ppc::gateway
2 changes: 2 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using namespace bcos;
using namespace ppc::protocol;
using namespace ppc::gateway;

// Note: the change of the topic will not trigger router-update
void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& topic)
{
m_routerInfo->registerTopic(_nodeID.toBytes(), topic);
Expand All @@ -48,6 +49,7 @@ void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const&
}
}

// Note: the change of the topic will not trigger router-update
void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const& topic)
{
m_routerInfo->unRegisterTopic(_nodeID.toBytes(), topic);
Expand Down
Loading

0 comments on commit 0c03b81

Please sign in to comment.