Skip to content

Commit

Permalink
add getPeers rpc implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 9, 2024
1 parent 3c0b9fb commit b1660b7
Show file tree
Hide file tree
Showing 19 changed files with 78 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/BuildInfo.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function(create_build_info)
# Generate header file containing useful build information
add_custom_target(BuildInfo.h ALL
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}"
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}/.."
-DPPC_BUILDINFO_IN="${CMAKE_CURRENT_SOURCE_DIR}/cmake/templates/BuildInfo.h.in"
-DPPC_DST_DIR="${PROJECT_BINARY_DIR}/include"
-DPPC_CMAKE_DIR="${CMAKE_CURRENT_SOURCE_DIR}/cmake"
Expand Down
1 change: 1 addition & 0 deletions cpp/ppc-framework/rpc/RpcTypeDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class RpcError : int32_t
std::string const RUN_TASK_METHOD = "runTask";
std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask";
std::string const GET_TASK_STATUS = "getTaskStatus";
std::string const GET_PEERS = "getPeers";

std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask";
std::string const FETCH_CIPHER = "fetchCipher";
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/air-node/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway);
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/cem-node/CEMInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto cemConfig = ppcConfig->cemConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto cemService = std::make_shared<CEMService>();
cemService->setCEMConfig(cemConfig);
cemService->setStorageConfig(storageConfig);
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-main/common/NodeStarter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ int startProgram(
}
printVersion();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The " + binaryName + "is running..." << std::endl;
std::cout << "The " + binaryName + " is running..." << std::endl;
while (!exitHandler.shouldExit())
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
starter.reset();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The" + binaryName + " program exit normally." << std::endl;
std::cout << "The " + binaryName + " program exit normally." << std::endl;
}
} // namespace ppc::node
2 changes: 1 addition & 1 deletion cpp/wedpr-main/mpc-node/MPCInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto mpcConfig = ppcConfig->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto mpcService = std::make_shared<MPCService>();
mpcService->setMPCConfig(mpcConfig);
mpcService->setStorageConfig(storageConfig);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-main/pro-node/ProNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(
m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway());
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
17 changes: 11 additions & 6 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void Front::fetchGatewayMetaInfo()
{
return;
}
bcos::UpgradeGuard ul(l);
front->m_agencyList = agencies;
FRONT_LOG(INFO) << LOG_DESC("Update agencies information")
<< LOG_KV("agencies", printVector(agencies));
Expand All @@ -100,11 +101,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
bcos::bytes data;
_message->encode(data);
auto self = weak_from_this();
// ROUTE_THROUGH_TOPIC will hold the topic
m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data),
_message->seq(), _timeout, _callback,
[self, _agencyID, _respCallback](
Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) {
ppc::protocol::MessageCallback msgCallback = nullptr;
if (_respCallback)
{
msgCallback = [self, _agencyID, _respCallback](
Error::Ptr error, Message::Ptr msg, SendResponseFunction resFunc) {
auto front = self.lock();
if (!front)
{
Expand All @@ -126,7 +127,11 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
// get the agencyID
_respCallback(error, msg->header()->optionalField()->srcInst(),
front->m_messageFactory->decodePPCMessage(msg), responseCallback);
});
};
}
// ROUTE_THROUGH_TOPIC will hold the topic
m_front->asyncSendMessage(RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data),
_message->seq(), _timeout, _callback, msgCallback);
}

// send response when receiving message from given agencyID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ void GatewayImpl::asyncGetPeers(std::function<void(Error::Ptr, std::string)> cal
Json::Value peers;
peers["agency"] = m_agency;
peers["nodeID"] = m_service->nodeID();
// add the local gatewayInfo
Json::Value localGatewayInfo;
m_localRouter->routerInfo()->toJson(localGatewayInfo);
peers["gateway"] = localGatewayInfo;
peers["peers"] = Json::Value(Json::arrayValue);
for (auto const& it : infos)
{
Expand Down Expand Up @@ -292,5 +296,6 @@ void GatewayImpl::asyncGetAgencies(
return;
}
auto agencies = m_peerRouter->agencies();
agencies.emplace_back(m_agency);
callback(nullptr, agencies);
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data)

void GatewayNodeInfoImpl::toJson(Json::Value& jsonObject) const
{
bcos::ReadGuard l(x_nodeList);
jsonObject["gatewayNodeID"] = p2pNodeID();
jsonObject["agency"] = agency();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class LocalRouter
}
uint32_t statusSeq() { return m_statusSeq; }

GatewayNodeInfo::Ptr const& routerInfo() const { return m_routerInfo; }

private:
uint32_t increaseSeq()
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ int main(int argc, const char* argv[])
// not specify the certPath in air-mode
ppcConfig->loadRpcConfig(param.configFilePath);
auto rpcFactory = std::make_shared<RpcFactory>("selfParty");
auto rpc = rpcFactory->buildRpc(ppcConfig);
auto rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
registerEchoHandler(rpc);
// start the rpc
rpc->start();
Expand Down
34 changes: 31 additions & 3 deletions cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ using namespace ppc::rpc;
using namespace ppc::tools;
using namespace ppc::protocol;

Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, std::string const& _selfPartyID,
std::string const& _token, std::string const& _prePath)
Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, ppc::gateway::IGateway::Ptr gateway,
std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath)
: m_prePath(_prePath),
m_wsService(std::move(_wsService)),
m_gateway(std::move(gateway)),
m_taskFactory(std::make_shared<JsonTaskFactory>(_selfPartyID, _prePath)),
m_token(_token)
{
Expand Down Expand Up @@ -74,7 +75,8 @@ Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, std::string const&
boost::bind(&Rpc::killBsModeTask, this, boost::placeholders::_1, boost::placeholders::_2);
m_methodToHandler[UPDATE_BS_MODE_TASK_STATUS] = boost::bind(
&Rpc::updateBsModeTaskStatus, this, boost::placeholders::_1, boost::placeholders::_2);

m_methodToHandler[GET_PEERS] =
boost::bind(&Rpc::getPeers, this, boost::placeholders::_1, boost::placeholders::_2);
RPC_LOG(INFO) << LOG_DESC("init rpc success") << LOG_KV("selfParty", _selfPartyID);
}

Expand Down Expand Up @@ -345,6 +347,32 @@ void Rpc::sendEcdhCipher(Json::Value const& _req, RespFunc _respFunc)
_respFunc(result->error(), result->serializeToJson());
}

void Rpc::getPeers(Json::Value const& _req, RespFunc _respFunc)
{
if (m_gateway == nullptr)
{
BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "the gateway not initialized!"));
}
m_gateway->asyncGetPeers([_respFunc](bcos::Error::Ptr error, std::string peersInfo) {
try
{
Json::Value root;
Json::Reader jsonReader;

if (!jsonReader.parse(peersInfo, root))
{
BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "Invalid json string: " + peersInfo));
}
_respFunc(error, std::move(root));
}
catch (std::exception const& e)
{
RPC_LOG(WARNING) << LOG_DESC("getPeers exception")
<< LOG_KV("error", boost::diagnostic_information(e));
}
});
}

void Rpc::sendPartnerCipher(Json::Value const& _req, RespFunc _respFunc)
{
if (!m_bsEcdhPSI)
Expand Down
7 changes: 6 additions & 1 deletion cpp/wedpr-transport/ppc-rpc/src/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "ppc-framework/front/FrontInterface.h"
#include "ppc-framework/gateway/IGateway.h"
#include "ppc-framework/rpc/RpcInterface.h"
#include "ppc-framework/rpc/RpcStatusInterface.h"
#include "protocol/src/JsonTaskImpl.h"
Expand All @@ -35,7 +36,8 @@ class Rpc : public RpcInterface
{
public:
using Ptr = std::shared_ptr<Rpc>;
Rpc(std::shared_ptr<bcos::boostssl::ws::WsService> _wsService, std::string const& _selfPartyID,
Rpc(std::shared_ptr<bcos::boostssl::ws::WsService> _wsService,
ppc::gateway::IGateway::Ptr gateway, std::string const& _selfPartyID,
std::string const& _token, std::string const& _prePath = "data");
~Rpc() override { stop(); }
void start() override
Expand Down Expand Up @@ -131,11 +133,14 @@ class Rpc : public RpcInterface
virtual void killBsModeTask(Json::Value const& _req, RespFunc _respFunc);
virtual void updateBsModeTaskStatus(Json::Value const& _req, RespFunc _respFunc);

virtual void getPeers(Json::Value const& _req, RespFunc _respFunc);

void checkHostResource();

private:
std::string m_prePath;
std::shared_ptr<bcos::boostssl::ws::WsService> m_wsService;
ppc::gateway::IGateway::Ptr m_gateway;
RpcStatusInterface::Ptr m_rpcStorage;

// Note: here use jsonTaskFactory to decrease the overhead to convert json::value to string when
Expand Down
5 changes: 3 additions & 2 deletions cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ using namespace bcos;
using namespace ppc::rpc;
using namespace ppc::tools;

Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config)
Rpc::Ptr RpcFactory::buildRpc(
ppc::tools::PPCConfig::ConstPtr _config, ppc::gateway::IGateway::Ptr gateway)
{
auto wsConfig = initConfig(_config);
// create the wsConfig
Expand All @@ -39,7 +40,7 @@ Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config)
initializer->initWsService(wsService);

auto rpc = std::make_shared<Rpc>(
wsService, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation());
wsService, gateway, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation());
rpc->setMinNeededMemory(_config->rpcConfig().minNeededMemoryGB);
return rpc;
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "Rpc.h"
#include "ppc-framework/gateway/IGateway.h"
#include <memory>
namespace bcos::boostssl::ws
{
Expand All @@ -37,8 +38,8 @@ class RpcFactory
RpcFactory(std::string const& _selfPartyID) : m_selfPartyID(_selfPartyID) {}
virtual ~RpcFactory() = default;

Rpc::Ptr buildRpc(std::shared_ptr<ppc::tools::PPCConfig const> _config);

Rpc::Ptr buildRpc(
std::shared_ptr<ppc::tools::PPCConfig const> _config, ppc::gateway::IGateway::Ptr gateway);

private:
std::shared_ptr<bcos::boostssl::ws::WsConfig> initConfig(
Expand Down
1 change: 0 additions & 1 deletion cpp/wedpr-transport/sdk/ProTransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class ProTransportImpl : public Transport, public std::enable_shared_from_this<P

protected:
ppc::front::FrontConfig::Ptr m_config;
ppc::gateway::IGateway::Ptr m_gateway;
std::shared_ptr<ppc::protocol::GrpcServer> m_server;
int m_keepAlivePeriodMs;
std::shared_ptr<bcos::Timer> m_timer;
Expand Down
6 changes: 5 additions & 1 deletion cpp/wedpr-transport/sdk/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/gateway/IGateway.h"
namespace ppc::sdk
{
class Transport
Expand All @@ -31,9 +32,12 @@ class Transport
virtual void start() { m_front->start(); }
virtual void stop() { m_front->stop(); }

virtual ppc::front::IFront::Ptr const& getFront() { return m_front; }
virtual ppc::front::IFront::Ptr const& getFront() const { return m_front; }

virtual ppc::gateway::IGateway::Ptr const& gateway() const { return m_gateway; }

protected:
ppc::front::IFront::Ptr m_front;
ppc::gateway::IGateway::Ptr m_gateway;
};
} // namespace ppc::sdk
1 change: 1 addition & 0 deletions cpp/wedpr-transport/sdk/TransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TransportImpl : public Transport
TransportImpl(ppc::front::FrontConfig::Ptr config, ppc::gateway::IGateway::Ptr const& gateway)
: m_config(std::move(config))
{
m_gateway = gateway;
ppc::front::FrontFactory frontFactory;
m_front = frontFactory.build(std::make_shared<ppc::protocol::NodeInfoFactory>(),
std::make_shared<ppc::protocol::MessagePayloadBuilderImpl>(),
Expand Down

0 comments on commit b1660b7

Please sign in to comment.