Skip to content

Commit

Permalink
add selectRouter to gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Oct 21, 2024
1 parent 5211add commit da1632d
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 113 deletions.
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ class IFront : virtual public IFrontClient
virtual void registerComponent(std::string const& component) = 0;
virtual void unRegisterComponent(std::string const& component) = 0;

// get the target nodeList according to the routeInfo
virtual std::vector<std::string> selectNodesByRoutePolicy(
int16_t routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) = 0;

private:
ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback)
{
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class IGateway
bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual bcos::Error::Ptr unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic) = 0;

// get the target nodeList according to the routeInfo
virtual std::vector<std::string> selectNodesByRoutePolicy(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) = 0;
};

} // namespace ppc::gateway
24 changes: 24 additions & 0 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ void GatewayClient::asyncSendMessage(RouteType routeType,
Status status) { callback(toError(status, *response)); });
}

std::vector<std::string> GatewayClient::selectNodesByRoutePolicy(
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo)
{
std::unique_ptr<ppc::proto::SelectRouteRequest> request(
generateSelectRouteRequest(routeType, routeInfo));
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<NodeList>();
// lambda keeps the lifecycle for clientContext
auto status = m_stub->selectNodesByRoutePolicy(context.get(), *request, response.get());
if (!status.ok())
{
throw std::runtime_error(
"selectNodesByRoutePolicy failed, code: " + std::to_string(status.error_code()) +
", msg: " + status.error_message());
}
if (response->error().errorcode() != 0)
{
throw std::runtime_error("selectNodesByRoutePolicy failed, code: " +
std::to_string(response->error().errorcode()) +
", msg: " + response->error().errormessage());
}
return std::vector<std::string>(response->nodelist().begin(), response->nodelist().end());
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
Expand Down
3 changes: 3 additions & 0 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;

std::vector<std::string> selectNodesByRoutePolicy(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) override;

private:
std::unique_ptr<ppc::proto::Gateway::Stub> m_stub;
std::map<std::string, std::unique_ptr<ppc::proto::Gateway::Stub>> m_broadcastStubs;
Expand Down
30 changes: 29 additions & 1 deletion cpp/wedpr-protocol/grpc/server/GatewayServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,35 @@ ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* conte
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1,
"handle message failed for : " + std::string(boost::diagnostic_information(e))));
"asyncSendMessage failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor;
}

grpc::ServerUnaryReactor* GatewayServer::selectNodesByRoutePolicy(
grpc::CallbackServerContext* context, const ppc::proto::SelectRouteRequest* selectRouteRequest,
ppc::proto::NodeList* reply)
{
ServerUnaryReactor* reactor(context->DefaultReactor());
try
{
auto routeInfo = generateRouteInfo(m_routeInfoBuilder, selectRouteRequest->routeinfo());
auto selectedNodes = m_gateway->selectNodesByRoutePolicy(
(ppc::protocol::RouteType)selectRouteRequest->routetype(), routeInfo);
for (auto const& it : selectedNodes)
{
reply->add_nodelist(it);
}
reactor->Finish(Status::OK);
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("selectNodesByRoutePolicy exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply->mutable_error(),
std::make_shared<bcos::Error>(-1, "selectNodesByRoutePolicy failed for : " +
std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor;
Expand Down
5 changes: 5 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService
grpc::ServerUnaryReactor* asyncSendMessage(grpc::CallbackServerContext* context,
const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* selectNodesByRoutePolicy(grpc::CallbackServerContext* context,
const ppc::proto::SelectRouteRequest* selectRouteRequest,
ppc::proto::NodeList* reply) override;

grpc::ServerUnaryReactor* asyncGetPeers(grpc::CallbackServerContext* context,
const ppc::proto::Empty* request, ppc::proto::PeersInfo* reply) override;
grpc::ServerUnaryReactor* asyncGetAgencies(grpc::CallbackServerContext* context,
Expand All @@ -57,6 +61,7 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService
grpc::ServerUnaryReactor* unRegisterTopic(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;


private:
ppc::gateway::IGateway::Ptr m_gateway;
MessageOptionalHeaderBuilder::Ptr m_routeInfoBuilder;
Expand Down
10 changes: 10 additions & 0 deletions cpp/wedpr-protocol/proto/pb/Service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ message SendedMessageRequest{
string traceID = 5;
};

message SelectRouteRequest{
int32 routeType = 1;
RouteInfo routeInfo = 2;
}

message AgenciesInfo{
Error error = 1;
repeated string agencies = 2;
Expand All @@ -47,12 +52,17 @@ message Condition{
repeated string components = 1;
};

message NodeList{
repeated string nodeList = 1;
Error error = 2;
};
service Front {
rpc onReceiveMessage (ReceivedMessage) returns (Error) {}
}
service Gateway{
rpc asyncSendMessage(SendedMessageRequest) returns(Error){}
rpc asyncGetPeers(Empty)returns(PeersInfo){}
rpc selectNodesByRoutePolicy(SelectRouteRequest)returns(NodeList){}
rpc asyncGetAgencies(Condition)returns(AgenciesInfo){}
rpc registerNodeInfo(NodeInfo) returns(Error){}
rpc unRegisterNodeInfo(NodeInfo)returns(Error){}
Expand Down
30 changes: 22 additions & 8 deletions cpp/wedpr-protocol/protobuf/src/RequestConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ inline MessageOptionalHeader::Ptr generateRouteInfo(
return routeInfo;
}

inline void setRouteInfo(
ppc::proto::RouteInfo* route_info, MessageOptionalHeader::Ptr const& routeInfo)
{
// set the route information
route_info->set_topic(routeInfo->topic());
route_info->set_componenttype(routeInfo->componentType());
route_info->set_srcnode(routeInfo->srcNode().data(), routeInfo->srcNode().size());
route_info->set_dstnode(routeInfo->dstNode().data(), routeInfo->dstNode().size());
route_info->set_dstinst(routeInfo->dstInst().data(), routeInfo->dstInst().size());
}

inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& traceID,
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload,
long timeout)
Expand All @@ -51,20 +62,23 @@ inline ppc::proto::SendedMessageRequest* generateRequest(std::string const& trac
request->set_traceid(traceID);
request->set_routetype(uint16_t(routeType));
// set the route information
request->mutable_routeinfo()->set_topic(routeInfo->topic());
request->mutable_routeinfo()->set_componenttype(routeInfo->componentType());
request->mutable_routeinfo()->set_srcnode(
routeInfo->srcNode().data(), routeInfo->srcNode().size());
request->mutable_routeinfo()->set_dstnode(
routeInfo->dstNode().data(), routeInfo->dstNode().size());
request->mutable_routeinfo()->set_dstinst(
routeInfo->dstInst().data(), routeInfo->dstInst().size());
setRouteInfo(request->mutable_routeinfo(), routeInfo);
// set the payload(TODO: optimize here)
request->set_payload(payload.data(), payload.size());
request->set_timeout(timeout);
return request;
}

inline ppc::proto::SelectRouteRequest* generateSelectRouteRequest(
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo)
{
auto request = new ppc::proto::SelectRouteRequest();
request->set_routetype(uint16_t(routeType));
// set the route information
setRouteInfo(request->mutable_routeinfo(), routeInfo);
return request;
}

inline ppc::proto::NodeInfo* toNodeInfoRequest(
bcos::bytesConstRef const& nodeID, std::string const& topic)
{
Expand Down
7 changes: 7 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ class FrontImpl : public IFront, public IFrontClient, public std::enable_shared_
void registerComponent(std::string const& component) override;
void unRegisterComponent(std::string const& component) override;

std::vector<std::string> selectNodesByRoutePolicy(
int16_t routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) override
{
return m_gatewayClient->selectNodesByRoutePolicy(
(ppc::protocol::RouteType)routeType, routeInfo);
}

private:
void asyncSendMessageToGateway(bool responsePacket,
ppc::protocol::MessagePayload::Ptr&& frontMessage, ppc::protocol::RouteType routeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void GatewayImpl::asyncSendMessage(ppc::protocol::RouteType routeType,
return;
}
// try to find the dstP2PNode
auto selectedP2PNodes = m_peerRouter->selectRouter(routeType, p2pMessage);
auto selectedP2PNodes =
m_peerRouter->selectRouter(routeType, p2pMessage->header()->optionalField());
if (selectedP2PNodes.empty())
{
GATEWAY_LOG(INFO) << LOG_DESC("can't find the gateway to send the message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this<Gateway
void asyncGetAgencies(std::vector<std::string> const& components,
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) override;

std::vector<std::string> selectNodesByRoutePolicy(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) override
{
return m_peerRouter->selectTargetNodes(routeType, routeInfo);
}

protected:
virtual void onReceiveP2PMessage(
bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,50 @@ std::set<std::string> PeerRouterTable::agencies(std::vector<std::string> const&
}

GatewayNodeInfos PeerRouterTable::selectRouter(
RouteType const& routeType, Message::Ptr const& msg) const
RouteType const& routeType, MessageOptionalHeader::Ptr const& routeInfo) const
{
switch (routeType)
{
case RouteType::ROUTE_THROUGH_NODEID:
return selectRouterByNodeID(msg);
return selectRouterByNodeID(routeInfo);
case RouteType::ROUTE_THROUGH_COMPONENT:
return selectRouterByComponent(msg);
return selectRouterByComponent(routeInfo);
case RouteType::ROUTE_THROUGH_AGENCY:
case RouteType::ROUTE_THROUGH_TOPIC:
return selectRouterByAgency(msg);
return selectRouterByAgency(routeInfo);
default:
BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment(
"selectRouter failed for encounter unsupported routeType: " +
std::to_string((uint16_t)routeType)));
}
}

GatewayNodeInfos PeerRouterTable::selectRouterByNodeID(Message::Ptr const& msg) const
std::vector<std::string> PeerRouterTable::selectTargetNodes(
RouteType const& routeType, MessageOptionalHeader::Ptr const& routeInfo) const
{
std::set<std::string> targetNodeList;
auto selectedP2PNodes = selectRouter(routeType, routeInfo);
if (selectedP2PNodes.empty())
{
return std::vector<std::string>();
}
for (auto const& it : selectedP2PNodes)
{
auto nodeList = it->nodeList();
for (auto const& it : nodeList)
{
targetNodeList.insert(std::string(it.first.begin(), it.first.end()));
}
}
return std::vector<std::string>(targetNodeList.begin(), targetNodeList.end());
}

GatewayNodeInfos PeerRouterTable::selectRouterByNodeID(
MessageOptionalHeader::Ptr const& routeInfo) const
{
GatewayNodeInfos result;
bcos::ReadGuard l(x_mutex);
auto it = m_nodeID2GatewayInfos.find(msg->header()->optionalField()->dstNode());
auto it = m_nodeID2GatewayInfos.find(routeInfo->dstNode());
// no router found
if (it == m_nodeID2GatewayInfos.end())
{
Expand All @@ -177,11 +198,12 @@ GatewayNodeInfos PeerRouterTable::selectRouterByNodeID(Message::Ptr const& msg)
}


GatewayNodeInfos PeerRouterTable::selectRouterByAgency(Message::Ptr const& msg) const
GatewayNodeInfos PeerRouterTable::selectRouterByAgency(
MessageOptionalHeader::Ptr const& routeInfo) const
{
GatewayNodeInfos result;
bcos::ReadGuard l(x_mutex);
auto it = m_agency2GatewayInfos.find(msg->header()->optionalField()->dstInst());
auto it = m_agency2GatewayInfos.find(routeInfo->dstInst());
// no router found
if (it == m_agency2GatewayInfos.end())
{
Expand All @@ -191,10 +213,11 @@ GatewayNodeInfos PeerRouterTable::selectRouterByAgency(Message::Ptr const& msg)
}

// Note: selectRouterByComponent support not specified the dstInst
GatewayNodeInfos PeerRouterTable::selectRouterByComponent(Message::Ptr const& msg) const
GatewayNodeInfos PeerRouterTable::selectRouterByComponent(
MessageOptionalHeader::Ptr const& routeInfo) const
{
GatewayNodeInfos result;
auto dstInst = msg->header()->optionalField()->dstInst();
auto dstInst = routeInfo->dstInst();
std::vector<GatewayNodeInfos> selectedRouterInfos;
{
bcos::ReadGuard l(x_mutex);
Expand All @@ -220,23 +243,23 @@ GatewayNodeInfos PeerRouterTable::selectRouterByComponent(Message::Ptr const& ms
}
for (auto const& it : selectedRouterInfos)
{
selectRouterByComponent(result, msg, it);
selectRouterByComponent(result, routeInfo, it);
}
return result;
}


void PeerRouterTable::selectRouterByComponent(GatewayNodeInfos& choosedGateway,
Message::Ptr const& msg, GatewayNodeInfos const& singleAgencyGatewayInfos) const
MessageOptionalHeader::Ptr const& routeInfo,
GatewayNodeInfos const& singleAgencyGatewayInfos) const
{
// foreach all gateways to find the component
for (auto const& it : singleAgencyGatewayInfos)
{
auto const& nodeListInfo = it->nodeList();
for (auto const& nodeInfo : nodeListInfo)
{
if (nodeInfo.second->components().count(
msg->header()->optionalField()->componentType()))
if (nodeInfo.second->components().count(routeInfo->componentType()))
{
choosedGateway.insert(it);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ class PeerRouterTable

virtual void updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo);
virtual void removeP2PID(std::string const& p2pNode);
virtual GatewayNodeInfos selectRouter(
ppc::protocol::RouteType const& routeType, ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouter(ppc::protocol::RouteType const& routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) const;

virtual std::vector<std::string> selectTargetNodes(ppc::protocol::RouteType const& routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) const;

virtual void asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const;

Expand All @@ -51,13 +54,17 @@ class PeerRouterTable
return m_agency2GatewayInfos;
}


private:
virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouterByNodeID(
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) const;
virtual GatewayNodeInfos selectRouterByComponent(
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) const;
void selectRouterByComponent(GatewayNodeInfos& choosedGateway,
ppc::protocol::Message::Ptr const& msg,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo,
GatewayNodeInfos const& singleAgencyGatewayInfos) const;
virtual GatewayNodeInfos selectRouterByAgency(ppc::protocol::Message::Ptr const& msg) const;
virtual GatewayNodeInfos selectRouterByAgency(
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo) const;
void removeP2PNodeIDFromNodeIDInfos(GatewayNodeInfo::Ptr const& gatewayInfo);
void insertGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo);
void removeP2PNodeIDFromAgencyInfos(std::string const& p2pNode);
Expand Down
Loading

0 comments on commit da1632d

Please sign in to comment.