Skip to content

Commit

Permalink
add asyncGetPeers && registerComponent && unRegisterComponent
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 18, 2024
1 parent 299f946 commit 4b0d644
Show file tree
Hide file tree
Showing 34 changed files with 734 additions and 61 deletions.
1 change: 1 addition & 0 deletions cpp/cmake/Options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ macro(print_config NAME)
message("-- BUILD_SDK BUILD SDK ${BUILD_SDK}")
message("-- BUILD_UDF BUILD UDF ${BUILD_UDF}")
message("-- BUILD_WEDPR_TOOLKIT BUILD_WEDPR_TOOLKIT ${BUILD_WEDPR_TOOLKIT}")
message("-- AUTO_GENERATE AUTO_GENERATE ${AUTO_GENERATE}")
message("-- DEBUG ${DEBUG}")
message("------------------------------------------------------------------------")
message("")
Expand Down
28 changes: 23 additions & 5 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MessageDispatcherHandler
virtual void onMessage(ppc::protocol::Message::Ptr msg) = 0;
};


class SendResponseHandler
{
public:
Expand Down Expand Up @@ -92,6 +93,16 @@ class IMessageHandler
SendResponseHandler sendResponseHandler) = 0;
};

class GetPeersInfoHandler
{
public:
using Ptr = std::shared_ptr<GetPeersInfoHandler>;
GetPeersInfoHandler() = default;
virtual ~GetPeersInfoHandler() {}

virtual void onPeersInfo(bcos::Error::Ptr e, std::string const& peersInfo) = 0;
};

///////// the callback definition for sdk wrapper /////////

class IFront : virtual public IFrontClient
Expand Down Expand Up @@ -183,14 +194,15 @@ class IFront : virtual public IFrontClient

// !!! Note: the 'payload ' type(char*) should not been changed, since it used to pass-in java
// byte[] data
virtual void async_send_response(char* dstNode, uint64_t dstNodeSize, std::string const& traceID,
char* payload, uint64_t payloadSize, int seq, ErrorCallback::Ptr errorCallback)
virtual void async_send_response(char* dstNode, uint64_t dstNodeSize,
std::string const& traceID, char* payload, uint64_t payloadSize, int seq,
ErrorCallback::Ptr errorCallback)
{
// TODO: optimize here
bcos::bytes copiedDstNode(dstNode, dstNode + dstNodeSize);
bcos::bytes copyedPayload(payload, payload + payloadSize);
asyncSendResponse(
copiedDstNode, traceID, std::move(copyedPayload), seq, populateErrorCallback(errorCallback));
asyncSendResponse(copiedDstNode, traceID, std::move(copyedPayload), seq,
populateErrorCallback(errorCallback));
}

// the sync interface for async_send_message
Expand All @@ -209,9 +221,11 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
virtual void asyncGetAgencies(std::vector<std::string> const& components,
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

virtual void asyncGetPeers(GetPeersInfoHandler::Ptr getPeersCallback) = 0;

/**
* @brief register the nodeInfo to the gateway
* @param nodeInfo the nodeInfo
Expand All @@ -223,6 +237,8 @@ class IFront : virtual public IFrontClient
*/
virtual bcos::Error::Ptr unRegisterNodeInfo() = 0;

virtual ppc::protocol::INodeInfo::Ptr const& nodeInfo() = 0;

/**
* @brief register the topic
*
Expand All @@ -237,6 +253,8 @@ class IFront : virtual public IFrontClient
*/
virtual bcos::Error::Ptr unRegisterTopic(std::string const& topic) = 0;

virtual void registerComponent(std::string const& component) = 0;
virtual void unRegisterComponent(std::string const& component) = 0;

private:
ppc::protocol::ReceiveMsgFunc populateErrorCallback(ErrorCallback::Ptr errorCallback)
Expand Down
2 changes: 1 addition & 1 deletion cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class IGateway
bcos::bytes&& payload) = 0;

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
virtual void asyncGetAgencies(std::vector<std::string> const& components,
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
Expand Down
5 changes: 4 additions & 1 deletion cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class INodeInfo

// components
virtual void setComponents(std::set<std::string> const& components) = 0;
virtual bool addComponent(std::string const& component) = 0;
virtual bool eraseComponent(std::string const& component) = 0;
virtual std::set<std::string> const& components() const = 0;
virtual std::vector<std::string> copiedComponents() const = 0;

virtual void encode(bcos::bytes& data) const = 0;
virtual void decode(bcos::bytesConstRef data) = 0;
Expand All @@ -58,7 +61,7 @@ class INodeInfo
virtual bool equal(INodeInfo::Ptr const& info)
{
return (nodeID().toBytes() == info->nodeID().toBytes()) &&
(components() == info->components());
(copiedComponents() == info->copiedComponents());
}

virtual void toJson(Json::Value& jsonObject) const = 0;
Expand Down
8 changes: 6 additions & 2 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::stri
});
}

void GatewayClient::asyncGetAgencies(
void GatewayClient::asyncGetAgencies(std::vector<std::string> const& components,
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback)
{
auto response = std::make_shared<AgenciesInfo>();
auto context = std::make_shared<ClientContext>();
auto request = std::make_shared<Empty>();
auto request = std::make_shared<Condition>();
for (auto const& it : components)
{
request->add_components(it);
}
// lambda keeps the lifecycle for clientContext
m_stub->async()->asyncGetAgencies(
context.get(), request.get(), response.get(), [context, callback, response](Status status) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override;

void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) override;
void asyncGetAgencies(
void asyncGetAgencies(std::vector<std::string> const& components,
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) override;

void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
Expand Down
11 changes: 8 additions & 3 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,19 @@ grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers(
return reactor;
}

grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies(
grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::AgenciesInfo* reply)
grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies(grpc::CallbackServerContext* context,
const ppc::proto::Condition* condition, ppc::proto::AgenciesInfo* reply)
{
ServerUnaryReactor* reactor(context->DefaultReactor());
try
{
std::vector<std::string> components;
for (int i = 0; i < condition->components_size(); i++)
{
components.emplace_back(condition->components(i));
}
m_gateway->asyncGetAgencies(
[reactor, reply](bcos::Error::Ptr error, std::set<std::string> agencies) {
components, [reactor, reply](bcos::Error::Ptr error, std::set<std::string> agencies) {
toSerializedError(reply->mutable_error(), error);
for (auto const& it : agencies)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/server/GatewayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService
grpc::ServerUnaryReactor* asyncGetPeers(grpc::CallbackServerContext* context,
const ppc::proto::Empty* request, ppc::proto::PeersInfo* reply) override;
grpc::ServerUnaryReactor* asyncGetAgencies(grpc::CallbackServerContext* context,
const ppc::proto::Empty* request, ppc::proto::AgenciesInfo* reply) override;
const ppc::proto::Condition* request, ppc::proto::AgenciesInfo* reply) override;


grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context,
Expand Down
5 changes: 4 additions & 1 deletion cpp/wedpr-protocol/proto/pb/Service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ message PeersInfo{
string peersInfo = 2;
};
message Empty{
};

message Condition{
repeated string components = 1;
};

service Front {
Expand All @@ -50,7 +53,7 @@ service Front {
service Gateway{
rpc asyncSendMessage(SendedMessageRequest) returns(Error){}
rpc asyncGetPeers(Empty)returns(PeersInfo){}
rpc asyncGetAgencies(Empty)returns(AgenciesInfo){}
rpc asyncGetAgencies(Condition)returns(AgenciesInfo){}
rpc registerNodeInfo(NodeInfo) returns(Error){}
rpc unRegisterNodeInfo(NodeInfo)returns(Error){}
rpc registerTopic(NodeInfo) returns(Error){}
Expand Down
14 changes: 13 additions & 1 deletion cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,28 @@ using namespace ppc::protocol;

void NodeInfoImpl::encode(bcos::bytes& data) const
{
encodeFields();
encodePBObject(data, m_rawNodeInfo);
}

void NodeInfoImpl::encodeFields()
{
bcos::ReadGuard l(x_components);
// set the components
for (auto const& component : m_components)
{
m_rawNodeInfo->add_components(component);
}
encodePBObject(data, m_rawNodeInfo);
}
void NodeInfoImpl::decode(bcos::bytesConstRef data)
{
decodePBObject(m_rawNodeInfo, data);
decodeFields();
}

void NodeInfoImpl::decodeFields()
{
bcos::WriteGuard l(x_components);
m_components = std::set<std::string>(
m_rawNodeInfo->components().begin(), m_rawNodeInfo->components().end());
}
Expand Down
48 changes: 46 additions & 2 deletions cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class NodeInfoImpl : public INodeInfo
NodeInfoImpl() { m_rawNodeInfo = std::make_shared<ppc::proto::NodeInfo>(); }
explicit NodeInfoImpl(std::shared_ptr<ppc::proto::NodeInfo> rawNodeInfo)
: m_rawNodeInfo(rawNodeInfo)
{}
{
decodeFields();
}
NodeInfoImpl(bcos::bytesConstRef const& data) : NodeInfoImpl() { decode(data); }

NodeInfoImpl(bcos::bytesConstRef const& nodeID, std::string const& endPoint) : NodeInfoImpl()
Expand All @@ -54,9 +56,45 @@ class NodeInfoImpl : public INodeInfo

void setComponents(std::set<std::string> const& components) override
{
bcos::WriteGuard l(x_components);
m_components = components;
}
std::set<std::string> const& components() const override { return m_components; }

std::set<std::string> const& components() const override
{
bcos::ReadGuard l(x_components);
return m_components;
}

std::vector<std::string> copiedComponents() const override
{
bcos::ReadGuard l(x_components);
return std::vector<std::string>(m_components.begin(), m_components.end());
}

bool addComponent(std::string const& component) override
{
bcos::UpgradableGuard l(x_components);
if (m_components.count(component))
{
return false;
}
bcos::UpgradeGuard ul(l);
m_components.insert(component);
return true;
}

bool eraseComponent(std::string const& component) override
{
bcos::UpgradableGuard l(x_components);
if (!m_components.count(component))
{
return false;
}
bcos::UpgradeGuard ul(l);
m_components.erase(component);
return true;
}

std::string const& endPoint() const override { return m_rawNodeInfo->endpoint(); }

Expand All @@ -78,9 +116,15 @@ class NodeInfoImpl : public INodeInfo

void toJson(Json::Value& jsonObject) const override;

void encodeFields();

protected:
virtual void decodeFields();

private:
std::shared_ptr<ppc::front::IFrontClient> m_front;
std::set<std::string> m_components;
mutable bcos::SharedMutex x_components;
std::shared_ptr<ppc::proto::NodeInfo> m_rawNodeInfo;
};

Expand Down
52 changes: 27 additions & 25 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ using namespace ppc::front;
Front::Front(ppc::front::PPCMessageFaceFactory::Ptr ppcMsgFactory, IFront::Ptr front)
: m_messageFactory(std::move(ppcMsgFactory)), m_front(std::move(front))
{
m_fetcher = std::make_shared<bcos::Timer>(60 * 1000, "metaFetcher");
m_fetcher = std::make_shared<bcos::Timer>(10 * 1000, "metaFetcher");
m_fetcher->registerTimeoutHandler([this]() {
try
{
Expand Down Expand Up @@ -58,30 +58,31 @@ void Front::stop()
void Front::fetchGatewayMetaInfo()
{
auto self = weak_from_this();
m_front->asyncGetAgencies([self](bcos::Error::Ptr error, std::set<std::string> agencies) {
auto front = self.lock();
if (!front)
{
return;
}
if (error && error->errorCode() != 0)
{
FRONT_LOG(WARNING) << LOG_DESC("asyncGetAgencies failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
return;
}
std::vector agencyList(agencies.begin(), agencies.end());
bcos::UpgradableGuard l(front->x_agencyList);
if (front->m_agencyList == agencyList)
{
return;
}
bcos::UpgradeGuard ul(l);
front->m_agencyList = agencyList;
FRONT_LOG(INFO) << LOG_DESC("Update agencies information")
<< LOG_KV("agencies", printVector(agencyList));
});
m_front->asyncGetAgencies(m_front->nodeInfo()->copiedComponents(),
[self](bcos::Error::Ptr error, std::set<std::string> agencies) {
auto front = self.lock();
if (!front)
{
return;
}
if (error && error->errorCode() != 0)
{
FRONT_LOG(WARNING)
<< LOG_DESC("asyncGetAgencies failed") << LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
return;
}
std::vector agencyList(agencies.begin(), agencies.end());
bcos::UpgradableGuard l(front->x_agencyList);
if (front->m_agencyList == agencyList)
{
return;
}
bcos::UpgradeGuard ul(l);
front->m_agencyList = agencyList;
FRONT_LOG(INFO) << LOG_DESC("Update agencies information")
<< LOG_KV("agencies", printVector(agencyList));
});
m_fetcher->restart();
}

Expand Down Expand Up @@ -192,4 +193,5 @@ void Front::registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType,
<< LOG_KV("error", boost::diagnostic_information(e));
}
});
m_front->registerComponent(std::to_string(type));
}
24 changes: 24 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,28 @@ bcos::Error::Ptr FrontImpl::push(uint16_t routeType, MessageOptionalHeader::Ptr
routeType, routeInfo, std::move(payload), seq, timeout,
[promise](bcos::Error::Ptr error) { promise->set_value(error); }, nullptr);
return promise->get_future().get();
}

void FrontImpl::asyncGetPeers(GetPeersInfoHandler::Ptr getPeersCallback)
{
m_gatewayClient->asyncGetPeers(
[getPeersCallback](bcos::Error::Ptr error, std::string peersInfo) {
getPeersCallback->onPeersInfo(error, peersInfo);
});
}

void FrontImpl::registerComponent(std::string const& component)
{
// Note: the node will report the latest components
auto ret = m_nodeInfo->addComponent(component);
FRONT_LOG(INFO) << LOG_DESC("registerComponent") << LOG_KV("component", component)
<< LOG_KV("insert", ret);
}

void FrontImpl::unRegisterComponent(std::string const& component)
{
// Note: the node will report the latest components
auto ret = m_nodeInfo->eraseComponent(component);
FRONT_LOG(INFO) << LOG_DESC("unRegisterComponent") << LOG_KV("component", component)
<< LOG_KV("erase", ret);
}
Loading

0 comments on commit 4b0d644

Please sign in to comment.