Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add gateway-router-manager #12

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/ppc-framework/gateway/GatewayProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ enum class GatewayPacketType : uint16_t
BroadcastMessage = 0x01,
RouterTableSyncSeq = 0x10,
RouterTableResponse = 0x11,
RouterTableRequest = 0x12
RouterTableRequest = 0x12,
SyncNodeSeq = 0x20,
RequestNodeStatus = 0x21,
ResponseNodeStatus = 0x22,
};

enum class GatewayMsgExtFlag : uint16_t
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service,
m_service->registerMsgHandler((uint16_t)GatewayPacketType::BroadcastMessage,
boost::bind(&GatewayImpl::onReceiveBroadcastMessage, this, boost::placeholders::_1,
boost::placeholders::_2));
m_gatewayRouterManager = std::make_shared<GatewayRouterManager>(
m_service, m_gatewayInfoFactory, m_localRouter, m_peerRouter);
}

void GatewayImpl::start()
Expand All @@ -63,6 +65,7 @@ void GatewayImpl::start()
m_running = true;
m_service->start();
m_p2pRouterManager->start();
m_gatewayRouterManager->start();
GATEWAY_LOG(INFO) << LOG_DESC("Start gateway success");
}

Expand All @@ -76,6 +79,7 @@ void GatewayImpl::stop()
m_running = false;
m_service->stop();
m_p2pRouterManager->stop();
m_gatewayRouterManager->stop();
GATEWAY_LOG(INFO) << LOG_DESC("Stop gateway success");
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ppc-gateway/gateway/router/GatewayNodeInfo.h"
#include "ppc-gateway/p2p/Service.h"
#include "ppc-gateway/p2p/router/RouterManager.h"
#include "router/GatewayRouterManager.h"
#include "router/LocalRouter.h"
#include "router/PeerRouterTable.h"

Expand Down Expand Up @@ -80,6 +81,7 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this<Gateway
std::string m_agency;

RouterManager::Ptr m_p2pRouterManager;
GatewayRouterManager::Ptr m_gatewayRouterManager;

GatewayNodeInfoFactory::Ptr m_gatewayInfoFactory;
LocalRouter::Ptr m_localRouter;
Expand Down
15 changes: 15 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
*/
#pragma once
#include "ppc-framework/protocol/INodeInfo.h"
#include <bcos-utilities/Log.h>
#include <memory>
#include <sstream>
namespace ppc::gateway
{
class GatewayNodeInfo
Expand All @@ -33,6 +35,9 @@ class GatewayNodeInfo
virtual std::string const& p2pNodeID() const = 0;
// the agency
virtual std::string const& agency() const = 0;
virtual uint32_t statusSeq() const = 0;
virtual void setStatusSeq(uint32_t statusSeq) = 0;

// get the node information by nodeID
virtual ppc::protocol::INodeInfo::Ptr nodeInfo(bcos::bytes const& nodeID) const = 0;
virtual bool tryAddNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
Expand All @@ -51,6 +56,7 @@ class GatewayNodeInfo
virtual void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) = 0;

virtual std::map<bcos::bytes, ppc::protocol::INodeInfo::Ptr> nodeList() const = 0;
virtual uint16_t nodeSize() const = 0;
};

class GatewayNodeInfoFactory
Expand All @@ -71,4 +77,13 @@ struct GatewayNodeInfoCmp
}
};
using GatewayNodeInfos = std::set<GatewayNodeInfo::Ptr, GatewayNodeInfoCmp>;

inline std::string printNodeStatus(GatewayNodeInfo::Ptr const& status)
{
std::ostringstream stringstream;
stringstream << LOG_KV("p2pNodeID", status->p2pNodeID()) << LOG_KV("agency", status->agency())
<< LOG_KV("statusSeq", status->statusSeq())
<< LOG_KV("nodeSize", status->nodeSize());
return stringstream.str();
}
} // namespace ppc::gateway
10 changes: 10 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ std::string const& GatewayNodeInfoImpl::agency() const
{
return m_inner()->agency;
}

uint32_t GatewayNodeInfoImpl::statusSeq() const
{
return m_inner()->statusSeq;
}
void GatewayNodeInfoImpl::setStatusSeq(uint32_t statusSeq)
{
m_inner()->statusSeq = statusSeq;
}

// get the node information by nodeID
INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo
bcos::WriteGuard l(x_nodeList);
return m_nodeList;
}
uint32_t statusSeq() const override;
void setStatusSeq(uint32_t statusSeq) override;

virtual uint16_t nodeSize() const override { return m_nodeList.size(); }

private:
std::function<ppctars::GatewayNodeInfo*()> m_inner;
Expand Down
183 changes: 183 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayRouterManager.h
* @author: yujiechen
* @date 2024-08-26
*/
#include "GatewayRouterManager.h"
#include "ppc-framework/gateway/GatewayProtocol.h"
#include <ppc-gateway/Common.h>

using namespace ppc::protocol;
using namespace ppc;
using namespace bcos;
using namespace ppc::gateway;
using namespace bcos::boostssl;
using namespace bcos::boostssl::ws;

GatewayRouterManager::GatewayRouterManager(Service::Ptr service,
GatewayNodeInfoFactory::Ptr nodeStatusFactory, LocalRouter::Ptr localRouter,
PeerRouterTable::Ptr peerRouter)
: m_service(std::move(service)),
m_nodeStatusFactory(std::move(nodeStatusFactory)),
m_localRouter(std::move(localRouter)),
m_peerRouter(std::move(peerRouter))
{
m_service->registerMsgHandler((uint16_t)GatewayPacketType::SyncNodeSeq,
boost::bind(&GatewayRouterManager::onReceiveNodeSeqMessage, this, boost::placeholders::_1,
boost::placeholders::_2));

m_service->registerMsgHandler((uint16_t)GatewayPacketType::RequestNodeStatus,
boost::bind(&GatewayRouterManager::onReceiveRequestNodeStatusMsg, this,
boost::placeholders::_1, boost::placeholders::_2));

m_service->registerMsgHandler((uint16_t)GatewayPacketType::ResponseNodeStatus,
boost::bind(&GatewayRouterManager::onRecvResponseNodeStatusMsg, this,
boost::placeholders::_1, boost::placeholders::_2));

m_timer = std::make_shared<Timer>(SEQ_SYNC_PERIOD, "seqSync");
// broadcast seq periodically
m_timer->registerTimeoutHandler([this]() { broadcastStatusSeq(); });
}


void GatewayRouterManager::start()
{
if (m_running)
{
GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been started");
return;
}
m_running = true;
m_timer->start();
GATEWAY_LOG(INFO) << LOG_DESC("start GatewayRouterManager success");
}

void GatewayRouterManager::stop()
{
if (!m_running)
{
GATEWAY_LOG(INFO) << LOG_DESC("GatewayRouterManager has already been stopped");
return;
}
m_running = false;
m_timer->stop();
GATEWAY_LOG(INFO) << LOG_DESC("stop GatewayRouterManager success");
}

void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSession::Ptr session)
{
auto statusSeq =
boost::asio::detail::socket_ops::network_to_host_long(*((uint32_t*)msg->payload()->data()));

auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (p2pMessage->header()->srcGwNode().size() > 0) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();
auto statusSeqChanged = statusChanged(from, statusSeq);
if (!statusSeqChanged)
{
return;
}
// status changed, request for the nodeStatus
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") << LOG_KV("from", from)
<< LOG_KV("statusSeq", statusSeq);
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::RequestNodeStatus, from, std::make_shared<bcos::bytes>());
}


bool GatewayRouterManager::statusChanged(std::string const& p2pNodeID, uint32_t seq)
{
bool ret = true;
ReadGuard l(x_p2pID2Seq);
auto it = m_p2pID2Seq.find(p2pNodeID);
if (it != m_p2pID2Seq.end())
{
ret = (seq > it->second);
}
return ret;
}

void GatewayRouterManager::broadcastStatusSeq()
{
m_timer->restart();
auto message = std::dynamic_pointer_cast<Message>(m_service->messageFactory()->buildMessage());
message->setPacketType((uint16_t)GatewayPacketType::SyncNodeSeq);
auto seq = m_localRouter->statusSeq();
auto statusSeq = boost::asio::detail::socket_ops::host_to_network_long(seq);
auto payload = std::make_shared<bytes>((byte*)&statusSeq, (byte*)&statusSeq + 4);
message->setPayload(payload);
GATEWAY_LOG(TRACE) << LOG_DESC("broadcastStatusSeq") << LOG_KV("seq", seq);
m_service->asyncBroadcastMessage(message);
}


void GatewayRouterManager::onReceiveRequestNodeStatusMsg(
MessageFace::Ptr msg, WsSession::Ptr session)
{
auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();

auto nodeStatusData = m_localRouter->generateNodeStatus();
if (!nodeStatusData)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveRequestNodeStatusMsg: generate nodeInfo error")
<< LOG_KV("from", from);
return;
}
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveRequestNodeStatusMsg: response the latest nodeStatus")
<< LOG_KV("from", from);
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::ResponseNodeStatus, from, nodeStatusData);
}

void GatewayRouterManager::onRecvResponseNodeStatusMsg(MessageFace::Ptr msg, WsSession::Ptr session)
{
auto nodeStatus = m_nodeStatusFactory->build();
nodeStatus->decode(bytesConstRef(msg->payload()->data(), msg->payload()->size()));

auto p2pMessage = std::dynamic_pointer_cast<Message>(msg);
auto const& from = (!p2pMessage->header()->srcGwNode().empty()) ?
p2pMessage->header()->srcGwNode() :
session->nodeId();

GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") << LOG_KV("from", from)
<< LOG_KV("statusSeq", nodeStatus->statusSeq())
<< LOG_KV("agency", nodeStatus->agency());
updatePeerNodeStatus(from, nodeStatus);
}

void GatewayRouterManager::updatePeerNodeStatus(
std::string const& p2pID, GatewayNodeInfo::Ptr status)
{
auto statusSeq = status->statusSeq();
{
UpgradableGuard l(x_p2pID2Seq);
if (m_p2pID2Seq.contains(p2pID) && (m_p2pID2Seq.at(p2pID) >= statusSeq))
{
return;
}
UpgradeGuard ul(l);
m_p2pID2Seq[p2pID] = statusSeq;
}
GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") << LOG_KV("from", p2pID)
<< LOG_KV("statusSeq", status->statusSeq())
<< LOG_KV("agency", status->agency());
m_peerRouter->updateGatewayInfo(status);
}
70 changes: 70 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayRouterManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayRouterManager.h
* @author: yujiechen
* @date 2024-08-26
*/
#pragma once
#include "ppc-gateway/gateway/router/GatewayNodeInfo.h"
#include "ppc-gateway/gateway/router/LocalRouter.h"
#include "ppc-gateway/gateway/router/PeerRouterTable.h"
#include "ppc-gateway/p2p/Service.h"
#include <bcos-utilities/Timer.h>
#include <memory>

namespace ppc::gateway
{
class GatewayRouterManager
{
public:
using Ptr = std::shared_ptr<GatewayRouterManager>;
GatewayRouterManager(Service::Ptr service, GatewayNodeInfoFactory::Ptr nodeStatusFactory,
LocalRouter::Ptr localRouter, PeerRouterTable::Ptr peerRouter);
virtual void start();
virtual void stop();

protected:
virtual void onReceiveNodeSeqMessage(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);

virtual void onReceiveRequestNodeStatusMsg(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);

virtual void onRecvResponseNodeStatusMsg(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);
bool statusChanged(std::string const& p2pNodeID, uint32_t seq);
void broadcastStatusSeq();

void updatePeerNodeStatus(std::string const& p2pID, GatewayNodeInfo::Ptr status);

private:
Service::Ptr m_service;
GatewayNodeInfoFactory::Ptr m_nodeStatusFactory;
std::shared_ptr<bcos::Timer> m_timer;

LocalRouter::Ptr m_localRouter;
PeerRouterTable::Ptr m_peerRouter;

bool m_running = false;

// P2pID => statusSeq
std::map<std::string, uint32_t> m_p2pID2Seq;
mutable bcos::SharedMutex x_p2pID2Seq;

// TODO: make this configurable
unsigned const SEQ_SYNC_PERIOD = 3000;
};
} // namespace ppc::gateway
2 changes: 2 additions & 0 deletions cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using namespace bcos;
using namespace ppc::protocol;
using namespace ppc::gateway;

// Note: the change of the topic will not trigger router-update
void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& topic)
{
m_routerInfo->registerTopic(_nodeID.toBytes(), topic);
Expand All @@ -48,6 +49,7 @@ void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const&
}
}

// Note: the change of the topic will not trigger router-update
void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const& topic)
{
m_routerInfo->unRegisterTopic(_nodeID.toBytes(), topic);
Expand Down
Loading
Loading