Skip to content

Commit

Permalink
add asyncGetPeers and asyncGetAgencies
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 9, 2024
1 parent 6d72991 commit 3c0b9fb
Show file tree
Hide file tree
Showing 29 changed files with 359 additions and 27 deletions.
1 change: 1 addition & 0 deletions cpp/ppc-framework/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC);
}


template <typename T>
inline std::string printNodeID(T const& nodeID)
{
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/FrontInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class FrontInterface

virtual std::string const& selfEndPoint() const { return m_selfEndPoint; }

virtual std::vector<std::string> agencies() const = 0;
virtual void start() = 0;
virtual void stop() = 0;

protected:
// the selfEndPoint for the air-mode-node can be localhost
std::string m_selfEndPoint = "localhost";
Expand Down
2 changes: 2 additions & 0 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

/**
* @brief register the nodeInfo to the gateway
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class IGateway
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
bcos::bytes&& payload) = 0;

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual bcos::Error::Ptr registerTopic(
Expand Down
3 changes: 3 additions & 0 deletions cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once
#include "ppc-framework/Common.h"
#include <bcos-utilities/Common.h>
#include <json/json.h>
#include <memory>
#include <set>
#include <sstream>
Expand Down Expand Up @@ -58,6 +59,8 @@ class INodeInfo
{
return (nodeID() == info->nodeID()) && (components() == info->components());
}

virtual void toJson(Json::Value& jsonObject) const = 0;
};
class INodeInfoFactory
{
Expand Down
19 changes: 19 additions & 0 deletions cpp/test-utils/FakeFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ class FakeFront : public FrontInterface
}
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}

std::vector<std::string> agencies() const override
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

void start() override {}
void stop() override {}

private:
// the uuid to _callback
Expand Down Expand Up @@ -231,5 +246,9 @@ class FakeFront : public FrontInterface
std::map<std::string, CallbackFunc> m_uuidToCallback;
bcos::Mutex m_mutex;
std::atomic<int64_t> m_uuid = 0;

// the agency list, for task-sync
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::test
18 changes: 1 addition & 17 deletions cpp/wedpr-computing/ppc-psi/src/PSIConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,7 @@ class PSIConfig
int taskExpireTime() const { return m_taskExpireTime; }
void setTaskExpireTime(int _taskExpireTime) { m_taskExpireTime = _taskExpireTime; }

std::vector<std::string> agencyList() const
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}
std::vector<std::string> agencyList() const { return m_front->agencies(); }

protected:
ppc::front::PPCMessageFace::Ptr generatePPCMsg(
Expand Down Expand Up @@ -160,10 +149,5 @@ class PSIConfig

// the task-expire time
int m_taskExpireTime = 10000;

// the agency list, for task-sync
// TODO: fetch from the gateway
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::psi
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ void testEcdhImplFunc(int64_t _dataBatchSize, std::string const& _serverPSIDataS
auto clientPSI = factory->createEcdhPSI(clientAgencyName, clientConfig);

std::vector<std::string> agencyList = {serverAgencyName, clientAgencyName};
serverPSI->psiConfig()->setAgencyList(agencyList);
clientPSI->psiConfig()->setAgencyList(agencyList);
auto serverFront = std::dynamic_pointer_cast<FakeFront>(serverPSI->psiConfig()->front());
serverFront->setAgencyList(agencyList);

auto clientFront = std::dynamic_pointer_cast<FakeFront>(clientPSI->psiConfig()->front());
clientFront->setAgencyList(agencyList);

// register the server-psi into the front
factory->front()->registerEcdhPSI(serverAgencyName, serverPSI);
Expand Down
13 changes: 12 additions & 1 deletion cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
* @date 2024-08-23
*/
#pragma once

#include "ppc-framework/Common.h"
#include <boost/asio/detail/socket_ops.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <random>
#include <sstream>
#include <string>

namespace ppc
{
Expand Down Expand Up @@ -52,4 +53,14 @@ inline std::string generateUUID()
static thread_local auto uuid_gen = boost::uuids::basic_random_generator<std::random_device>();
return boost::uuids::to_string(uuid_gen());
}
template <typename T>
inline std::string printVector(std::vector<T> const& list)
{
std::stringstream oss;
for (auto const& it : list)
{
oss << it << ",";
}
return oss.str();
}
} // namespace ppc
8 changes: 4 additions & 4 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc)

void Initializer::start()
{
if (m_transport)
if (m_ppcFront)
{
m_transport->start();
m_ppcFront->start();
}
/*if (m_ecdhConnPSI)
{
Expand Down Expand Up @@ -456,9 +456,9 @@ void Initializer::start()
void Initializer::stop()
{
// stop the network firstly
if (m_transport)
if (m_ppcFront)
{
m_transport->stop();
m_ppcFront->stop();
}
/*if (m_ecdhConnPSI)
{
Expand Down
27 changes: 27 additions & 0 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ void GatewayClient::asyncSendMessage(RouteType routeType,
[callback, response](Status status) { callback(toError(status, std::move(*response))); });
}

void GatewayClient::asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback)
{
auto response = std::make_shared<PeersInfo>();
ClientContext context;
auto request = std::make_shared<Empty>();
m_stub->async()->asyncGetPeers(
&context, request.get(), response.get(), [callback, response](Status status) {
callback(toError(status, std::move(*response->mutable_error())), response->peersinfo());
});
}

void GatewayClient::asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback)
{
auto response = std::make_shared<AgenciesInfo>();
ClientContext context;
auto request = std::make_shared<Empty>();
m_stub->async()->asyncGetAgencies(
&context, request.get(), response.get(), [callback, response](Status status) {
std::vector<std::string> agencies;
for (int i = 0; i < response->agencies_size(); i++)
{
agencies.emplace_back(response->agencies(i));
}
callback(toError(status, std::move(*response->mutable_error())), agencies);
});
}

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
Expand Down
4 changes: 4 additions & 0 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override;

void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) override;
void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) override;

void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
bcos::bytes&& payload) override
Expand Down
53 changes: 53 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,59 @@ ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* conte
return reactor.get();
}

grpc::ServerUnaryReactor* GatewayServer::asyncGetPeers(
grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::PeersInfo* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
m_gateway->asyncGetPeers([reactor, reply](bcos::Error::Ptr error, std::string peersInfo) {
toSerializedError(reply->mutable_error(), error);
reply->set_peersinfo(std::move(peersInfo));
reactor->Finish(Status::OK);
});
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncGetPeers exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply->mutable_error(),
std::make_shared<bcos::Error>(
-1, "asyncGetPeers failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}

grpc::ServerUnaryReactor* GatewayServer::asyncGetAgencies(
grpc::CallbackServerContext* context, const ppc::proto::Empty*, ppc::proto::AgenciesInfo* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
m_gateway->asyncGetAgencies(
[reactor, reply](bcos::Error::Ptr error, std::vector<std::string> agencies) {
toSerializedError(reply->mutable_error(), error);
for (auto const& it : agencies)
{
reply->add_agencies(it);
}
reactor->Finish(Status::OK);
});
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncGetAgencies exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply->mutable_error(),
std::make_shared<bcos::Error>(-1,
"asyncGetAgencies failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}


ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* context,
const ppc::proto::NodeInfo* serializedNodeInfo, ppc::proto::Error* reply)
{
Expand Down
6 changes: 6 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class GatewayServer : public ppc::proto::Gateway::CallbackService
grpc::ServerUnaryReactor* asyncSendMessage(grpc::CallbackServerContext* context,
const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* asyncGetPeers(grpc::CallbackServerContext* context,
const ppc::proto::Empty* request, ppc::proto::PeersInfo* reply) override;
grpc::ServerUnaryReactor* asyncGetAgencies(grpc::CallbackServerContext* context,
const ppc::proto::Empty* request, ppc::proto::AgenciesInfo* reply) override;


grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;

Expand Down
16 changes: 16 additions & 0 deletions cpp/wedpr-protocol/proto/pb/Service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,27 @@ message SendedMessageRequest{
int64 timeout = 4;
string traceID = 5;
};

message AgenciesInfo{
Error error = 1;
repeated string agencies = 2;
};

message PeersInfo{
Error error = 1;
string peersInfo = 2;
};
message Empty{

};

service Front {
rpc onReceiveMessage (ReceivedMessage) returns (Error) {}
}
service Gateway{
rpc asyncSendMessage(SendedMessageRequest) returns(Error){}
rpc asyncGetPeers(Empty)returns(PeersInfo){}
rpc asyncGetAgencies(Empty)returns(AgenciesInfo){}
rpc registerNodeInfo(NodeInfo) returns(Error){}
rpc unRegisterNodeInfo(NodeInfo)returns(Error){}
rpc registerTopic(NodeInfo) returns(Error){}
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/protobuf/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ endforeach()

file(GLOB_RECURSE SRCS *.cpp)
add_library(${PB_PROTOCOL_TARGET} ${SRCS} ${MESSAGES_SRCS})
target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} protobuf::libprotobuf ${CPU_FEATURES_LIB})
target_link_libraries(${PB_PROTOCOL_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} jsoncpp_static protobuf::libprotobuf ${CPU_FEATURES_LIB})
13 changes: 13 additions & 0 deletions cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,17 @@ void NodeInfoImpl::decode(bcos::bytesConstRef data)
decodePBObject(m_rawNodeInfo, data);
m_components = std::set<std::string>(
m_rawNodeInfo->components().begin(), m_rawNodeInfo->components().end());
}

void NodeInfoImpl::toJson(Json::Value& jsonObject) const
{
jsonObject["nodeID"] = std::string(nodeID().begin(), nodeID().end());
jsonObject["endPoint"] = endPoint();
Json::Value componentsInfo(Json::arrayValue);
auto componentsList = components();
for (auto const& it : componentsList)
{
componentsInfo.append(it);
}
jsonObject["components"] = componentsInfo;
}
2 changes: 2 additions & 0 deletions cpp/wedpr-protocol/protobuf/src/NodeInfoImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class NodeInfoImpl : public INodeInfo
}
std::shared_ptr<ppc::front::IFrontClient> const& getFront() const override { return m_front; }

void toJson(Json::Value& jsonObject) const override;

private:
std::shared_ptr<ppc::front::IFrontClient> m_front;
std::set<std::string> m_components;
Expand Down
Loading

0 comments on commit 3c0b9fb

Please sign in to comment.