Skip to content

Commit

Permalink
add registerMessageHandler to support dispatcher message by component
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 9, 2024
1 parent 5e67cff commit f470482
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 64 deletions.
2 changes: 2 additions & 0 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;
/**
* @brief async send message
*
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
m_protocolInitializer = std::make_shared<ProtocolInitializer>();
m_protocolInitializer->init(m_config);

auto ppcMessageFactory = std::make_shared<PPCMessageFactory>();
// init the frontService
INIT_LOG(INFO) << LOG_DESC("init the frontService") << LOG_KV("agency", m_config->agencyID());
auto frontThreadPool = std::make_shared<bcos::ThreadPool>("front", m_config->threadPoolSize());
Expand All @@ -89,7 +88,8 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
{
m_transport = transportBuilder.build(SDKMode::PRO, m_config->frontConfig(), nullptr);
}
m_ppcFront = std::make_shared<Front>(m_transport->getFront());
m_ppcFront =
std::make_shared<Front>(std::make_shared<PPCMessageFactory>(), m_transport->getFront());

INIT_LOG(INFO) << LOG_DESC("init the frontService success")
<< LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig()))
Expand Down
52 changes: 40 additions & 12 deletions cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,49 @@ void CallbackManager::registerTopicHandler(
m_topicHandlers.insert(std::make_pair(topic, callback));
}

void CallbackManager::registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback)
{
bcos::WriteGuard l(x_msgHandlers);
m_msgHandlers.insert(std::make_pair(componentType, callback));
}

MessageDispatcherCallback CallbackManager::getHandlerByTopic(std::string const& topic)
{
bcos::ReadGuard l(x_topicHandlers);
auto it = m_topicHandlers.find(topic);
if (it != m_topicHandlers.end())
{
return it->second;
}
return nullptr;
}

MessageDispatcherCallback CallbackManager::getHandlerByComponentType(
std::string const& componentType)
{
bcos::ReadGuard l(x_msgHandlers);
auto it = m_msgHandlers.find(componentType);
if (it != m_msgHandlers.end())
{
return it->second;
}
return nullptr;
}

void CallbackManager::onReceiveMessage(std::string const& topic, Message::Ptr msg)
{
MessageDispatcherCallback callback = nullptr;
auto callback = getHandlerByTopic(topic);
if (!callback)
{
bcos::ReadGuard l(x_topicHandlers);
auto it = m_topicHandlers.find(topic);
if (it == m_topicHandlers.end())
{
FRONT_LOG(DEBUG) << LOG_DESC(
"onReceiveMessage: not find the handler, put into the buffer")
<< LOG_KV("topic", topic);
addMsgCache(topic, msg);
return;
}
callback = it->second;
callback = getHandlerByComponentType(msg->header()->optionalField()->componentType());
}
if (!callback)
{
FRONT_LOG(DEBUG) << LOG_DESC("onReceiveMessage: not find the handler, put into the buffer")
<< LOG_KV("topic", topic);
addMsgCache(topic, msg);
return;
}
m_threadPool->enqueue([callback, msg]() {
try
Expand Down
11 changes: 11 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/CallbackManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class CallbackManager : public std::enable_shared_from_this<CallbackManager>
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback);

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback);

virtual ppc::protocol::Message::Ptr pop(std::string const& topic, int timeoutMs)
{
auto it = m_msgCache.find(topic);
Expand Down Expand Up @@ -94,6 +97,10 @@ class CallbackManager : public std::enable_shared_from_this<CallbackManager>
msgQueue->push(std::move(msg));
}

ppc::protocol::MessageDispatcherCallback getHandlerByTopic(std::string const& topic);
ppc::protocol::MessageDispatcherCallback getHandlerByComponentType(
std::string const& componentType);

private:
bcos::ThreadPool::Ptr m_threadPool;
std::shared_ptr<boost::asio::io_service> m_ioService;
Expand All @@ -105,6 +112,10 @@ class CallbackManager : public std::enable_shared_from_this<CallbackManager>
std::map<std::string, ppc::protocol::MessageDispatcherCallback> m_topicHandlers;
mutable bcos::SharedMutex x_topicHandlers;

// componentType => messageDispatcherCallback
std::map<std::string, ppc::protocol::MessageDispatcherCallback> m_msgHandlers;
mutable bcos::SharedMutex x_msgHandlers;

// the messageCache for the message with no topic handler defined
uint64_t m_maxMsgCacheSize = 10000;
// TODO: check the queueSize
Expand Down
5 changes: 4 additions & 1 deletion cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ using namespace bcos;
using namespace ppc::protocol;
using namespace ppc::front;

Front::Front(IFront::Ptr front) : m_front(std::move(front))
Front::Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front)
: m_messageFactory(std::move(ppcMsgFactory)), m_front(std::move(front))
{
m_fetcher = std::make_shared<bcos::Timer>(60 * 1000, "metaFetcher");
m_fetcher->registerTimeoutHandler([this]() {
Expand Down Expand Up @@ -98,6 +99,8 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
auto routeInfo = front->routerInfoBuilder()->build();
routeInfo->setDstInst(_agencyID);
routeInfo->setTopic(_message->taskID());
auto type = ((uint16_t)_message->taskType() << 8) | _message->algorithmType();
routeInfo->setComponentType(std::to_string(type));
bcos::bytes data;
_message->encode(data);
auto self = weak_from_this();
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this<Front>
{
public:
using Ptr = std::shared_ptr<Front>;
Front(IFront::Ptr front);
Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front);
~Front() override {}

void start() override;
Expand Down Expand Up @@ -66,7 +66,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this<Front>
{
uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType;
auto self = weak_from_this();
m_front->registerTopicHandler(
m_front->registerMessageHandler(
std::to_string(type), [self, _handler](ppc::protocol::Message::Ptr msg) {
auto front = self.lock();
if (!front)
Expand Down
6 changes: 6 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_
m_callbackManager->registerTopicHandler(topic, callback);
}

void registerMessageHandler(std::string const& componentType,
ppc::protocol::MessageDispatcherCallback callback) override
{
m_callbackManager->registerMessageHandler(componentType, callback);
}

/**
* @brief register the nodeInfo to the gateway
* @param nodeInfo the nodeInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ std::vector<std::shared_ptr<ppc::front::IFrontClient>> GatewayNodeInfoImpl::choo
bool selectAll, std::string const& topic) const
{
std::vector<std::shared_ptr<ppc::front::IFrontClient>> result;
// empty topic means broadcast message to all front
if (topic.empty())
{
bcos::ReadGuard l(x_nodeList);
for (auto const& it : m_nodeList)
{
result.emplace_back(it.second->getFront());
}
return result;
}
// the topic specified
bcos::ReadGuard l(x_topicInfo);
for (auto const& it : m_topicInfo)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const
bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc callback, bool holding)
{
auto frontList = chooseReceiver(msg);
// send success
// find the front
if (!frontList.empty())
{
for (auto const& front : frontList)
Expand Down
85 changes: 40 additions & 45 deletions cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ void Service::onP2PConnect(WsSession::Ptr _session)

RecursiveGuard l(x_nodeID2Session);
auto it = m_nodeID2Session.find(_session->nodeId());
// the session already connected
if (it != m_nodeID2Session.end() && it->second->isConnected())
{
SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect, drop the duplicated connection")
<< LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());
_session->drop(WsError::UserDisconnect);
updateNodeIDInfo(_session);
_session->drop(WsError::UserDisconnect);
return;
}
// the node-self
Expand All @@ -77,41 +76,18 @@ void Service::onP2PConnect(WsSession::Ptr _session)
_session->drop(WsError::UserDisconnect);
return;
}


///// Note: here allow all new session, even the ip not configured(support dynamic access)
bool updated = updateNodeIDInfo(_session);
// hit the m_nodeID2Session
if (it != m_nodeID2Session.end())
{
// the old session has already been connected, and the new session endPoint is not
// configured
if (it->second->isConnected() && !updated)
{
SERVICE_LOG(INFO) << LOG_DESC(
"onP2PConnect, drop the new not-configurated session, remain "
"the old session")
<< LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endPoint", _session->endPoint())
<< LOG_KV("oldEndPoint", it->second->endPoint());
_session->drop(WsError::UserDisconnect);
return;
}
SERVICE_LOG(INFO) << LOG_DESC(
"onP2PConnect, drop the old not-configurated session, replace "
"with the new session")
<< LOG_KV("nodeID", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endPoint", _session->endPoint())
<< LOG_KV("oldEndPoint", it->second->endPoint());
if (it->second->isConnected())
{
it->second->drop(WsError::UserDisconnect);
}
it->second = _session;
return;
}
// the new session
m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session));
else
{
// the new session
m_nodeID2Session.insert(std::make_pair(_session->nodeId(), _session));
}
SERVICE_LOG(INFO) << LOG_DESC("onP2PConnect established new session")
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());
Expand All @@ -127,7 +103,7 @@ bool Service::updateNodeIDInfo(WsSession::Ptr const& _session)
{
it->second = p2pNodeID;
SERVICE_LOG(INFO) << LOG_DESC("updateNodeIDInfo: update the nodeID")
<< LOG_KV("nodeid", p2pNodeID)
<< LOG_KV("nodeid", printP2PIDElegantly(p2pNodeID))
<< LOG_KV("endpoint", _session->endPoint());
return true;
}
Expand All @@ -138,38 +114,58 @@ bool Service::updateNodeIDInfo(WsSession::Ptr const& _session)
return false;
}

void Service::removeSessionInfo(WsSession::Ptr const& _session)
bool Service::removeSessionInfo(WsSession::Ptr const& _session)
{
RecursiveGuard l(x_nodeID2Session);
auto it = m_nodeID2Session.find(_session->nodeId());
if (it != m_nodeID2Session.end() && it->second->endPoint() == _session->endPoint())
if (it != m_nodeID2Session.end() && it->second->endPointInfo() == _session->endPointInfo())
{
SERVICE_LOG(INFO) << "onP2PDisconnectand remove from m_nodeID2Session"
SERVICE_LOG(INFO) << "onP2PDisconnect: remove from m_nodeID2Session"
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());

m_nodeID2Session.erase(it);
return true;
}
return false;
}

void Service::onP2PDisconnect(WsSession::Ptr _session)
{
// remove the session information
removeSessionInfo(_session);
if (!removeSessionInfo(_session))
{
return;
}
// update the session nodeID to empty
UpgradableGuard l(x_configuredNode2ID);
for (auto& it : m_configuredNode2ID)
{
// reset the nodeID of the dropped session(except the node-self) to empty
if (m_nodeID != _session->nodeId() && it.second == _session->nodeId())
// the node-self, no need to reset the nodeID
if (m_nodeID == _session->nodeId())
{
continue;
}
// not with the same nodeID, can't reset the nodeID
if (it.second != _session->nodeId())
{
UpgradeGuard ul(l);
it.second.clear();
break;
continue;
}
UpgradeGuard ul(l);
it.second.clear();
SERVICE_LOG(INFO) << "onP2PDisconnect: clear the nodeID information"
<< LOG_KV("p2pid", printP2PIDElegantly(_session->nodeId()))
<< LOG_KV("endpoint", _session->endPoint());
break;
}
}

bool Service::nodeConnected(std::string const& nodeID)
{
bcos::RecursiveGuard l(x_nodeID2Session);
return m_nodeID2Session.count(nodeID);
}

void Service::reconnect()
{
// obtain the un-connected peers information
Expand All @@ -182,14 +178,12 @@ void Service::reconnect()
{
continue;
}
if (!it.second.empty() && isConnected(it.first))
if (!it.second.empty() && nodeConnected(it.second))
{
continue;
}
unconnectedPeers->insert(it.first);
SERVICE_LOG(DEBUG) << LOG_DESC("ready to reconnect")
<< LOG_KV("endpoint",
it.first.address() + ":" + std::to_string(it.first.port()));
SERVICE_LOG(DEBUG) << LOG_DESC("ready to reconnect") << LOG_KV("endpoint", it.first);
}
}
setReconnectedPeers(unconnectedPeers);
Expand Down Expand Up @@ -356,5 +350,6 @@ void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const&
sessions.emplace_back(session);
WsService::asyncSendMessage(sessions, respMessage);
SERVICE_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage))
<< LOG_KV("payload size", payload->size());
<< LOG_KV("payloadSize",
respMessage->payload() ? respMessage->payload()->size() : 0);
}
4 changes: 3 additions & 1 deletion cpp/wedpr-transport/ppc-gateway/ppc-gateway/p2p/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ class Service : public bcos::boostssl::ws::WsService
virtual void onP2PConnect(bcos::boostssl::ws::WsSession::Ptr _session);
virtual void onP2PDisconnect(bcos::boostssl::ws::WsSession::Ptr _session);

virtual bool nodeConnected(std::string const& nodeID);

void reconnect() override;

bool updateNodeIDInfo(bcos::boostssl::ws::WsSession::Ptr const& _session);
void removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session);
bool removeSessionInfo(bcos::boostssl::ws::WsSession::Ptr const& _session);
bcos::boostssl::ws::WsSession::Ptr getSessionByNodeID(std::string const& _nodeID);

virtual void asyncSendMessageWithForward(std::string const& dstNodeID,
Expand Down

0 comments on commit f470482

Please sign in to comment.