Skip to content

Commit

Permalink
update gateway-router-table when the node become unreachable
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 10, 2024
1 parent 98c949d commit 31403c0
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 27 deletions.
1 change: 1 addition & 0 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ bool GrpcClient::checkHealth()
{
try
{
HEALTH_LOG(TRACE) << LOG_DESC("checkHealth");
ClientContext context;
HealthCheckResponse response;
auto status =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service,
m_gatewayInfoFactory(std::make_shared<GatewayNodeInfoFactoryImpl>(service->nodeID(), agency)),
m_localRouter(std::make_shared<LocalRouter>(
m_gatewayInfoFactory, m_frontBuilder, std::make_shared<MessageCache>(ioService))),
m_peerRouter(std::make_shared<PeerRouterTable>(m_service))
m_peerRouter(std::make_shared<PeerRouterTable>(m_service, m_gatewayInfoFactory))
{
m_service->registerMsgHandler((uint16_t)GatewayPacketType::P2PMessage,
boost::bind(&GatewayImpl::onReceiveP2PMessage, this, boost::placeholders::_1,
Expand All @@ -53,6 +53,34 @@ GatewayImpl::GatewayImpl(Service::Ptr const& service,
boost::placeholders::_2));
m_gatewayRouterManager = std::make_shared<GatewayRouterManager>(
m_service, m_gatewayInfoFactory, m_localRouter, m_peerRouter);

m_service->registerOnNewSession([this](WsSession::Ptr _session) {
if (!_session)
{
return;
}
m_p2pRouterManager->onNewSession(_session->nodeId());
});

m_service->registerOnDeleteSession([this](WsSession::Ptr _session) {
if (!_session)
{
return;
}
m_p2pRouterManager->onEraseSession(_session->nodeId());
});

m_p2pRouterManager->registerUnreachableHandler([this](std::string const& unreachableNode) {
m_gatewayRouterManager->removeUnreachableP2pNode(unreachableNode);
});

m_service->registerDisconnectHandler([this](WsSession::Ptr _session) {
if (!_session)
{
return;
}
m_gatewayRouterManager->removeUnreachableP2pNode(_session->nodeId());
});
}

void GatewayImpl::start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class GatewayNodeInfoFactory
virtual ~GatewayNodeInfoFactory() = default;

virtual GatewayNodeInfo::Ptr build() const = 0;
virtual GatewayNodeInfo::Ptr build(std::string const& p2pNode) const = 0;
};
struct GatewayNodeInfoCmp
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo
public:
using Ptr = std::shared_ptr<GatewayNodeInfoImpl>;
GatewayNodeInfoImpl() : m_rawGatewayInfo(std::make_shared<ppc::proto::GatewayNodeInfo>()) {}
GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency)
: GatewayNodeInfoImpl()
GatewayNodeInfoImpl(std::string const& p2pNodeID) : GatewayNodeInfoImpl()
{
m_rawGatewayInfo->set_p2pnodeid(p2pNodeID);
}
GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency)
: GatewayNodeInfoImpl(p2pNodeID)
{
m_rawGatewayInfo->set_agency(agency);
}

Expand Down Expand Up @@ -121,6 +124,11 @@ class GatewayNodeInfoFactoryImpl : public GatewayNodeInfoFactory
return std::make_shared<GatewayNodeInfoImpl>(m_p2pNodeID, m_agency);
}

GatewayNodeInfo::Ptr build(std::string const& p2pNode) const override
{
return std::make_shared<GatewayNodeInfoImpl>(p2pNode);
}

private:
std::string m_p2pNodeID;
std::string m_agency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ void GatewayRouterManager::stop()
ROUTER_MGR_LOG(INFO) << LOG_DESC("stop GatewayRouterManager success");
}

void GatewayRouterManager::removeUnreachableP2pNode(std::string const& p2pNode)
{
ROUTER_MGR_LOG(INFO) << LOG_DESC("removeUnreachableP2pNode")
<< LOG_KV("p2pid", printP2PIDElegantly(p2pNode));
{
// remove statusSeq info
WriteGuard l(x_p2pID2Seq);
m_p2pID2Seq.erase(p2pNode);
}
m_peerRouter->removeP2PID(p2pNode);
}

void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSession::Ptr session)
{
auto statusSeq =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class GatewayRouterManager
virtual void start();
virtual void stop();

void removeUnreachableP2pNode(std::string const& p2pNode);

protected:
virtual void onReceiveNodeSeqMessage(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo)
PEER_ROUTER_LOG(INFO) << LOG_DESC("updateGatewayInfo")
<< LOG_KV("detail", printNodeStatus(gatewayInfo));
auto nodeList = gatewayInfo->nodeList();

removeP2PNodeIDFromNodeIDInfos(gatewayInfo);
insertGatewayInfo(gatewayInfo);
}

void PeerRouterTable::insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo)
{
bcos::WriteGuard l(x_mutex);
// insert new information for the gateway
for (auto const& it : nodeList)
{
// update nodeID => gatewayInfos
if (!m_nodeID2GatewayInfos.count(it.first))
{
m_nodeID2GatewayInfos.insert(std::make_pair(it.first, GatewayNodeInfos()));
}
m_nodeID2GatewayInfos[it.first].insert(gatewayInfo);
}
if (!m_agency2GatewayInfos.count(gatewayInfo->agency()))
{
m_agency2GatewayInfos.insert(std::make_pair(gatewayInfo->agency(), GatewayNodeInfos()));
}
// update agency => gatewayInfos
m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo);
}

void PeerRouterTable::removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo)
{
bcos::WriteGuard l(x_mutex);
// remove the origin information of the gateway
auto it = m_nodeID2GatewayInfos.begin();
Expand All @@ -50,22 +78,44 @@ void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo)
}
it++;
}
// insert new information for the gateway
for (auto const& it : nodeList)
}

void PeerRouterTable::removeP2PNodeIDFromAgencyInfos(std::string const& p2pNode)
{
bcos::WriteGuard l(x_mutex);
for (auto it = m_agency2GatewayInfos.begin(); it != m_agency2GatewayInfos.end();)
{
// update nodeID => gatewayInfos
if (!m_nodeID2GatewayInfos.count(it.first))
auto& gatewayInfos = it->second;
for (auto pGateway = gatewayInfos.begin(); pGateway != gatewayInfos.end();)
{
m_nodeID2GatewayInfos.insert(std::make_pair(it.first, GatewayNodeInfos()));
if (pGateway->p2pNodeID() == p2pNode)
{
pGateway = gatewayInfos.erase(pGateway);
continue;
}
pGateway++;
}
m_nodeID2GatewayInfos[it.first].insert(gatewayInfo);
}
if (!m_agency2GatewayInfos.count(gatewayInfo->agency()))
{
m_agency2GatewayInfos.insert(std::make_pair(gatewayInfo->agency(), GatewayNodeInfos()));
if (gatewayInfos.empty())
{
it = m_agency2GatewayInfos.erase(it);
continue;
}
it++;
}
// update agency => gatewayInfos
m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo);
}

// m_agency2GatewayInfos
// std::map<std::string, GatewayNodeInfos> m_agency2GatewayInfos;

void PeerRouterTable::removeP2PID(std::string const& p2pNode)
{
PEER_ROUTER_LOG(INFO) << LOG_DESC("PeerRouterTable: removeP2PID")
<< LOG_KV("p2pID", printP2PIDElegantly(p2pNode));
// remove P2PNode from m_nodeID2GatewayInfos
auto gatewayInfo = m_gatewayInfoFactory->build(p2pNode);
removeP2PNodeIDFromNodeIDInfos(gatewayInfo);
// remove P2PNode from m_agency2GatewayInfos
removeP2PNodeIDFromAgencyInfos(p2pNode);
}

std::set<std::string> PeerRouterTable::agencies() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ class PeerRouterTable
{
public:
using Ptr = std::shared_ptr<PeerRouterTable>;
PeerRouterTable(Service::Ptr service) : m_service(std::move(service)) {}
PeerRouterTable(Service::Ptr service, GatewayNodeInfoFactory::Ptr gatewayInfoFactory)
: m_service(std::move(service)), m_gatewayInfoFactory(std::move(gatewayInfoFactory))
{}
virtual ~PeerRouterTable() = default;

virtual void updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo);
virtual void removeP2PID(std::string const& p2pNode);
virtual GatewayNodeInfos selectRouter(
ppc::protocol::RouteType const& routeType, ppc::protocol::Message::Ptr const& msg) const;

Expand All @@ -52,10 +55,13 @@ class PeerRouterTable
virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouterByAgency(ppc::protocol::Message::Ptr const& msg) const;

void removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo);
void insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo);
void removeP2PNodeIDFromAgencyInfos(std::string const& p2pNode);

private:
Service::Ptr m_service;
GatewayNodeInfoFactory::Ptr m_gatewayInfoFactory;
// nodeID => p2pNodes
std::map<bcos::bytes, GatewayNodeInfos> m_nodeID2GatewayInfos;
// agency => p2pNodes
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ Service::Service(std::string const& _nodeID, RouterTableFactory::Ptr const& _rou
boost::bind(&Service::onP2PDisconnect, this, boost::placeholders::_1));
}


void Service::onP2PConnect(WsSession::Ptr _session)
{
SERVICE_LOG(INFO) << LOG_DESC("Receive new p2p connection")
Expand Down Expand Up @@ -88,6 +87,7 @@ void Service::onP2PConnect(WsSession::Ptr _session)
{
// the new session
m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session));
callNewSessionHandlers(_session);
}
SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect established new session")
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
Expand Down Expand Up @@ -126,6 +126,7 @@ bool Service::removeSessionInfo(WsSession::Ptr const& _session)
<< LOG_KV("endpoint", _session->endPoint());

m_nodeID2Session.erase(it);
callDeleteSessionHandlers();
return true;
}
return false;
Expand Down
47 changes: 47 additions & 0 deletions cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ class Service : public bcos::boostssl::ws::WsService
}
}

// handlers called when new-session
void registerOnNewSession(std::function<void(bcos::boostssl::ws::WsSession::Ptr)> _handler)
{
m_newSessionHandlers.emplace_back(_handler);
}
// handlers called when delete-session
void registerOnDeleteSession(std::function<void(bcos::boostssl::ws::WsSession::Ptr)> _handler)
{
m_deleteSessionHandlers.emplace_back(_handler);
}

protected:
void onRecvMessage(bcos::boostssl::MessageFace::Ptr _msg,
bcos::boostssl::ws::WsSession::Ptr _session) override;
Expand All @@ -93,6 +104,37 @@ class Service : public bcos::boostssl::ws::WsService
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options,
bcos::boostssl::ws::RespCallBack respFunc);

virtual void callNewSessionHandlers(bcos::boostssl::ws::WsSession::Ptr _session)
{
try
{
for (auto const& handler : m_newSessionHandlers)
{
handler(_session);
}
}
catch (std::exception const& e)
{
SERVICE_LOG(WARNING) << LOG_DESC("callNewSessionHandlers exception")
<< LOG_KV("error", boost::diagnostic_information(e));
}
}
virtual void callDeleteSessionHandlers(bcos::boostssl::ws::WsSession::Ptr _session)
{
try
{
for (auto const& handler : m_deleteSessionHandlers)
{
handler(_session);
}
}
catch (std::exception const& e)
{
SERVICE_LOG(WARNING) << LOG_DESC("callDeleteSessionHandlers exception")
<< LOG_KV("error", boost::diagnostic_information(e));
}
}

protected:
std::string m_nodeID;
// nodeID=>session
Expand All @@ -105,5 +147,10 @@ class Service : public bcos::boostssl::ws::WsService
// configuredNode=>nodeID
std::map<bcos::boostssl::NodeIPEndpoint, std::string> m_configuredNode2ID;
mutable bcos::SharedMutex x_configuredNode2ID;

// handlers called when new-session
std::vector<std::function<void(bcos::boostssl::ws::WsSession::Ptr)>> m_newSessionHandlers;
// handlers called when delete-session
std::vector<std::function<void(bcos::boostssl::ws::WsSession::Ptr)>> m_deleteSessionHandlers;
};
} // namespace ppc::gateway
Loading

0 comments on commit 31403c0

Please sign in to comment.