Skip to content

Commit

Permalink
add getPeers rpc implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 9, 2024
1 parent 3c0b9fb commit cb0f6e7
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 15 deletions.
1 change: 1 addition & 0 deletions cpp/ppc-framework/rpc/RpcTypeDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class RpcError : int32_t
std::string const RUN_TASK_METHOD = "runTask";
std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask";
std::string const GET_TASK_STATUS = "getTaskStatus";
std::string const GET_PEERS = "getPeers";

std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask";
std::string const FETCH_CIPHER = "fetchCipher";
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/air-node/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway);
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/cem-node/CEMInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto cemConfig = ppcConfig->cemConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto cemService = std::make_shared<CEMService>();
cemService->setCEMConfig(cemConfig);
cemService->setStorageConfig(storageConfig);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/mpc-node/MPCInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto mpcConfig = ppcConfig->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto mpcService = std::make_shared<MPCService>();
mpcService->setMPCConfig(mpcConfig);
mpcService->setStorageConfig(storageConfig);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-main/pro-node/ProNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(
m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway());
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/ppc-rpc/demo/rpc_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ int main(int argc, const char* argv[])
// not specify the certPath in air-mode
ppcConfig->loadRpcConfig(param.configFilePath);
auto rpcFactory = std::make_shared<RpcFactory>("selfParty");
auto rpc = rpcFactory->buildRpc(ppcConfig);
auto rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
registerEchoHandler(rpc);
// start the rpc
rpc->start();
Expand Down
34 changes: 31 additions & 3 deletions cpp/wedpr-transport/ppc-rpc/src/Rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ using namespace ppc::rpc;
using namespace ppc::tools;
using namespace ppc::protocol;

Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, std::string const& _selfPartyID,
std::string const& _token, std::string const& _prePath)
Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, ppc::gateway::IGateway::Ptr gateway,
std::string const& _selfPartyID, std::string const& _token, std::string const& _prePath)
: m_prePath(_prePath),
m_wsService(std::move(_wsService)),
m_gateway(std::move(gateway)),
m_taskFactory(std::make_shared<JsonTaskFactory>(_selfPartyID, _prePath)),
m_token(_token)
{
Expand Down Expand Up @@ -74,7 +75,8 @@ Rpc::Rpc(std::shared_ptr<boostssl::ws::WsService> _wsService, std::string const&
boost::bind(&Rpc::killBsModeTask, this, boost::placeholders::_1, boost::placeholders::_2);
m_methodToHandler[UPDATE_BS_MODE_TASK_STATUS] = boost::bind(
&Rpc::updateBsModeTaskStatus, this, boost::placeholders::_1, boost::placeholders::_2);

m_methodToHandler[GET_PEERS] =
boost::bind(&Rpc::getPeers, this, boost::placeholders::_1, boost::placeholders::_2);
RPC_LOG(INFO) << LOG_DESC("init rpc success") << LOG_KV("selfParty", _selfPartyID);
}

Expand Down Expand Up @@ -345,6 +347,32 @@ void Rpc::sendEcdhCipher(Json::Value const& _req, RespFunc _respFunc)
_respFunc(result->error(), result->serializeToJson());
}

void Rpc::getPeers(Json::Value const& _req, RespFunc _respFunc)
{
if (m_gateway == nullptr)
{
BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "the gateway not initialized!"));
}
m_gateway->asyncGetPeers([_respFunc](bcos::Error::Ptr error, std::string peersInfo) {
try
{
Json::Value root;
Json::Reader jsonReader;

if (!jsonReader.parse(peersInfo, root))
{
BOOST_THROW_EXCEPTION(BCOS_ERROR(-1, "Invalid json string: " + peersInfo));
}
_respFunc(error, std::move(root));
}
catch (std::exception const& e)
{
RPC_LOG(WARNING) << LOG_DESC("getPeers exception")
<< LOG_KV("error", boost::diagnostic_information(e));
}
});
}

void Rpc::sendPartnerCipher(Json::Value const& _req, RespFunc _respFunc)
{
if (!m_bsEcdhPSI)
Expand Down
7 changes: 6 additions & 1 deletion cpp/wedpr-transport/ppc-rpc/src/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "ppc-framework/front/FrontInterface.h"
#include "ppc-framework/gateway/IGateway.h"
#include "ppc-framework/rpc/RpcInterface.h"
#include "ppc-framework/rpc/RpcStatusInterface.h"
#include "protocol/src/JsonTaskImpl.h"
Expand All @@ -35,7 +36,8 @@ class Rpc : public RpcInterface
{
public:
using Ptr = std::shared_ptr<Rpc>;
Rpc(std::shared_ptr<bcos::boostssl::ws::WsService> _wsService, std::string const& _selfPartyID,
Rpc(std::shared_ptr<bcos::boostssl::ws::WsService> _wsService,
ppc::gateway::IGateway::Ptr gateway, std::string const& _selfPartyID,
std::string const& _token, std::string const& _prePath = "data");
~Rpc() override { stop(); }
void start() override
Expand Down Expand Up @@ -131,11 +133,14 @@ class Rpc : public RpcInterface
virtual void killBsModeTask(Json::Value const& _req, RespFunc _respFunc);
virtual void updateBsModeTaskStatus(Json::Value const& _req, RespFunc _respFunc);

virtual void getPeers(Json::Value const& _req, RespFunc _respFunc);

void checkHostResource();

private:
std::string m_prePath;
std::shared_ptr<bcos::boostssl::ws::WsService> m_wsService;
ppc::gateway::IGateway::Ptr m_gateway;
RpcStatusInterface::Ptr m_rpcStorage;

// Note: here use jsonTaskFactory to decrease the overhead to convert json::value to string when
Expand Down
5 changes: 3 additions & 2 deletions cpp/wedpr-transport/ppc-rpc/src/RpcFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ using namespace bcos;
using namespace ppc::rpc;
using namespace ppc::tools;

Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config)
Rpc::Ptr RpcFactory::buildRpc(
ppc::tools::PPCConfig::ConstPtr _config, ppc::gateway::IGateway::Ptr gateway)
{
auto wsConfig = initConfig(_config);
// create the wsConfig
Expand All @@ -39,7 +40,7 @@ Rpc::Ptr RpcFactory::buildRpc(ppc::tools::PPCConfig::ConstPtr _config)
initializer->initWsService(wsService);

auto rpc = std::make_shared<Rpc>(
wsService, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation());
wsService, gateway, m_selfPartyID, _config->rpcConfig().token, _config->dataLocation());
rpc->setMinNeededMemory(_config->rpcConfig().minNeededMemoryGB);
return rpc;
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/wedpr-transport/ppc-rpc/src/RpcFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "Rpc.h"
#include "ppc-framework/gateway/IGateway.h"
#include <memory>
namespace bcos::boostssl::ws
{
Expand All @@ -37,8 +38,8 @@ class RpcFactory
RpcFactory(std::string const& _selfPartyID) : m_selfPartyID(_selfPartyID) {}
virtual ~RpcFactory() = default;

Rpc::Ptr buildRpc(std::shared_ptr<ppc::tools::PPCConfig const> _config);

Rpc::Ptr buildRpc(
std::shared_ptr<ppc::tools::PPCConfig const> _config, ppc::gateway::IGateway::Ptr gateway);

private:
std::shared_ptr<bcos::boostssl::ws::WsConfig> initConfig(
Expand Down
1 change: 0 additions & 1 deletion cpp/wedpr-transport/sdk/ProTransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class ProTransportImpl : public Transport, public std::enable_shared_from_this<P

protected:
ppc::front::FrontConfig::Ptr m_config;
ppc::gateway::IGateway::Ptr m_gateway;
std::shared_ptr<ppc::protocol::GrpcServer> m_server;
int m_keepAlivePeriodMs;
std::shared_ptr<bcos::Timer> m_timer;
Expand Down
6 changes: 5 additions & 1 deletion cpp/wedpr-transport/sdk/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "ppc-framework/front/IFront.h"
#include "ppc-framework/gateway/IGateway.h"
namespace ppc::sdk
{
class Transport
Expand All @@ -31,9 +32,12 @@ class Transport
virtual void start() { m_front->start(); }
virtual void stop() { m_front->stop(); }

virtual ppc::front::IFront::Ptr const& getFront() { return m_front; }
virtual ppc::front::IFront::Ptr const& getFront() const { return m_front; }

virtual ppc::gateway::IGateway::Ptr const& gateway() const { return m_gateway; }

protected:
ppc::front::IFront::Ptr m_front;
ppc::gateway::IGateway::Ptr m_gateway;
};
} // namespace ppc::sdk
1 change: 1 addition & 0 deletions cpp/wedpr-transport/sdk/TransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TransportImpl : public Transport
TransportImpl(ppc::front::FrontConfig::Ptr config, ppc::gateway::IGateway::Ptr const& gateway)
: m_config(std::move(config))
{
m_gateway = gateway;
ppc::front::FrontFactory frontFactory;
m_front = frontFactory.build(std::make_shared<ppc::protocol::NodeInfoFactory>(),
std::make_shared<ppc::protocol::MessagePayloadBuilderImpl>(),
Expand Down

0 comments on commit cb0f6e7

Please sign in to comment.