Skip to content

Commit

Permalink
add asyncGetPeers and asyncGetAgencies (#27)
Browse files Browse the repository at this point in the history
* add asyncGetPeers and asyncGetAgencies

* add getPeers rpc implementation

* fix gateway bugs

* add registerMessageHandler to support dispatcher message by component
  • Loading branch information
cyjseagull authored Sep 9, 2024
1 parent 6d72991 commit f89c9bc
Show file tree
Hide file tree
Showing 58 changed files with 637 additions and 155 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/BuildInfo.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function(create_build_info)
# Generate header file containing useful build information
add_custom_target(BuildInfo.h ALL
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}"
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}/.."
-DPPC_BUILDINFO_IN="${CMAKE_CURRENT_SOURCE_DIR}/cmake/templates/BuildInfo.h.in"
-DPPC_DST_DIR="${PROJECT_BINARY_DIR}/include"
-DPPC_CMAKE_DIR="${CMAKE_CURRENT_SOURCE_DIR}/cmake"
Expand Down
22 changes: 0 additions & 22 deletions cpp/ppc-framework/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,6 @@ DERIVE_PPC_EXCEPTION(DataSchemaNotSetted);
DERIVE_PPC_EXCEPTION(UnsupportedDataSchema);
DERIVE_PPC_EXCEPTION(WeDPRException);

constexpr static int MAX_PORT = 65535;
constexpr static int DEFAULT_SECURITY_PARAM = 128;

constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26;

inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
{
if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH)
{
return p2pId;
}
return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC);
}

template <typename T>
inline std::string printNodeID(T const& nodeID)
{
return std::string(nodeID.begin(), nodeID.begin() + 8);
}

#if ENABLE_CPU_FEATURES
#if X86
static const cpu_features::X86Features CPU_FEATURES = cpu_features::GetX86Info().features;
Expand Down
48 changes: 48 additions & 0 deletions cpp/ppc-framework/Helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2022 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file Common.h
* @author: yujiechen
* @date 2022-10-20
*/
#pragma once
#include <string>
#include <sstream>
namespace ppc
{
constexpr static int MAX_PORT = 65535;
constexpr static int DEFAULT_SECURITY_PARAM = 128;

constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26;

inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
{
if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH)
{
return p2pId;
}
return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC);
}


template <typename T>
inline std::string_view printNodeID(T const& nodeID)
{
size_t offset = nodeID.size() >= 8 ? 8 : nodeID.size();
return std::string_view((const char*)nodeID.data(), offset);
}
} // namespace ppc
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/FrontInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class FrontInterface

virtual std::string const& selfEndPoint() const { return m_selfEndPoint; }

virtual std::vector<std::string> agencies() const = 0;
virtual void start() = 0;
virtual void stop() = 0;

protected:
// the selfEndPoint for the air-mode-node can be localhost
std::string m_selfEndPoint = "localhost";
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;
/**
* @brief async send message
*
Expand Down Expand Up @@ -99,6 +101,8 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

/**
* @brief register the nodeInfo to the gateway
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class IGateway
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
bcos::bytes&& payload) = 0;

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual bcos::Error::Ptr registerTopic(
Expand Down
5 changes: 4 additions & 1 deletion cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
* @date 2024-08-26
*/
#pragma once
#include "ppc-framework/Common.h"
#include "ppc-framework/Helper.h"
#include <bcos-utilities/Common.h>
#include <json/json.h>
#include <memory>
#include <set>
#include <sstream>
Expand Down Expand Up @@ -58,6 +59,8 @@ class INodeInfo
{
return (nodeID() == info->nodeID()) && (components() == info->components());
}

virtual void toJson(Json::Value& jsonObject) const = 0;
};
class INodeInfoFactory
{
Expand Down
10 changes: 5 additions & 5 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* @date 2024-08-22
*/
#pragma once
#include "../Common.h"
#include "MessagePayload.h"
#include "RouteType.h"
#include "ppc-framework/Helper.h"
#include <bcos-boostssl/interfaces/MessageFace.h>
#include <bcos-utilities/Common.h>
#include <bcos-utilities/DataConvertUtility.h>
Expand Down Expand Up @@ -249,10 +249,10 @@ inline std::string printOptionalField(MessageOptionalHeader::Ptr optionalHeader)
std::ostringstream stringstream;
stringstream << LOG_KV("topic", optionalHeader->topic())
<< LOG_KV("componentType", optionalHeader->componentType())
<< LOG_KV("srcNode", *(bcos::toHexString(optionalHeader->srcNode())))
<< LOG_KV("dstNode", *(bcos::toHexString(optionalHeader->dstNode())))
<< LOG_KV("dstInst", optionalHeader->dstInst());

<< LOG_KV("srcNode", printNodeID(optionalHeader->srcNode()))
<< LOG_KV("dstNode", printNodeID(optionalHeader->dstNode()))
<< LOG_KV("srcInst", printNodeID(optionalHeader->srcInst()))
<< LOG_KV("dstInst", printNodeID(optionalHeader->dstInst()));
return stringstream.str();
}

Expand Down
1 change: 0 additions & 1 deletion cpp/ppc-framework/protocol/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ inline std::ostream& operator<<(std::ostream& _out, HashImplName const& _type)

enum class MessageType : uint16_t
{
GatewayMessage = 0x0000,
RpcRequest = 0x1000, // the rpc request type
};

Expand Down
1 change: 1 addition & 0 deletions cpp/ppc-framework/rpc/RpcTypeDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class RpcError : int32_t
std::string const RUN_TASK_METHOD = "runTask";
std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask";
std::string const GET_TASK_STATUS = "getTaskStatus";
std::string const GET_PEERS = "getPeers";

std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask";
std::string const FETCH_CIPHER = "fetchCipher";
Expand Down
19 changes: 19 additions & 0 deletions cpp/test-utils/FakeFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ class FakeFront : public FrontInterface
}
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}

std::vector<std::string> agencies() const override
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

void start() override {}
void stop() override {}

private:
// the uuid to _callback
Expand Down Expand Up @@ -231,5 +246,9 @@ class FakeFront : public FrontInterface
std::map<std::string, CallbackFunc> m_uuidToCallback;
bcos::Mutex m_mutex;
std::atomic<int64_t> m_uuid = 0;

// the agency list, for task-sync
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::test
18 changes: 1 addition & 17 deletions cpp/wedpr-computing/ppc-psi/src/PSIConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,7 @@ class PSIConfig
int taskExpireTime() const { return m_taskExpireTime; }
void setTaskExpireTime(int _taskExpireTime) { m_taskExpireTime = _taskExpireTime; }

std::vector<std::string> agencyList() const
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}
std::vector<std::string> agencyList() const { return m_front->agencies(); }

protected:
ppc::front::PPCMessageFace::Ptr generatePPCMsg(
Expand Down Expand Up @@ -160,10 +149,5 @@ class PSIConfig

// the task-expire time
int m_taskExpireTime = 10000;

// the agency list, for task-sync
// TODO: fetch from the gateway
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::psi
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ void testEcdhImplFunc(int64_t _dataBatchSize, std::string const& _serverPSIDataS
auto clientPSI = factory->createEcdhPSI(clientAgencyName, clientConfig);

std::vector<std::string> agencyList = {serverAgencyName, clientAgencyName};
serverPSI->psiConfig()->setAgencyList(agencyList);
clientPSI->psiConfig()->setAgencyList(agencyList);
auto serverFront = std::dynamic_pointer_cast<FakeFront>(serverPSI->psiConfig()->front());
serverFront->setAgencyList(agencyList);

auto clientFront = std::dynamic_pointer_cast<FakeFront>(clientPSI->psiConfig()->front());
clientFront->setAgencyList(agencyList);

// register the server-psi into the front
factory->front()->registerEcdhPSI(serverAgencyName, serverPSI);
Expand Down
13 changes: 12 additions & 1 deletion cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
* @date 2024-08-23
*/
#pragma once

#include "ppc-framework/Common.h"
#include <boost/asio/detail/socket_ops.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <random>
#include <sstream>
#include <string>

namespace ppc
{
Expand Down Expand Up @@ -52,4 +53,14 @@ inline std::string generateUUID()
static thread_local auto uuid_gen = boost::uuids::basic_random_generator<std::random_device>();
return boost::uuids::to_string(uuid_gen());
}
template <typename T>
inline std::string printVector(std::vector<T> const& list)
{
std::stringstream oss;
for (auto const& it : list)
{
oss << it << ",";
}
return oss.str();
}
} // namespace ppc
12 changes: 6 additions & 6 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
m_protocolInitializer = std::make_shared<ProtocolInitializer>();
m_protocolInitializer->init(m_config);

auto ppcMessageFactory = std::make_shared<PPCMessageFactory>();
// init the frontService
INIT_LOG(INFO) << LOG_DESC("init the frontService") << LOG_KV("agency", m_config->agencyID());
auto frontThreadPool = std::make_shared<bcos::ThreadPool>("front", m_config->threadPoolSize());
Expand All @@ -89,7 +88,8 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
{
m_transport = transportBuilder.build(SDKMode::PRO, m_config->frontConfig(), nullptr);
}
m_ppcFront = std::make_shared<Front>(m_transport->getFront());
m_ppcFront =
std::make_shared<Front>(std::make_shared<PPCMessageFactory>(), m_transport->getFront());

INIT_LOG(INFO) << LOG_DESC("init the frontService success")
<< LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig()))
Expand Down Expand Up @@ -413,9 +413,9 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc)

void Initializer::start()
{
if (m_transport)
if (m_ppcFront)
{
m_transport->start();
m_ppcFront->start();
}
/*if (m_ecdhConnPSI)
{
Expand Down Expand Up @@ -456,9 +456,9 @@ void Initializer::start()
void Initializer::stop()
{
// stop the network firstly
if (m_transport)
if (m_ppcFront)
{
m_transport->stop();
m_ppcFront->stop();
}
/*if (m_ecdhConnPSI)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/air-node/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway);
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/cem-node/CEMInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto cemConfig = ppcConfig->cemConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto cemService = std::make_shared<CEMService>();
cemService->setCEMConfig(cemConfig);
cemService->setStorageConfig(storageConfig);
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-main/common/NodeStarter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ int startProgram(
}
printVersion();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The " + binaryName + "is running..." << std::endl;
std::cout << "The " + binaryName + " is running..." << std::endl;
while (!exitHandler.shouldExit())
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
starter.reset();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The" + binaryName + " program exit normally." << std::endl;
std::cout << "The " + binaryName + " program exit normally." << std::endl;
}
} // namespace ppc::node
2 changes: 1 addition & 1 deletion cpp/wedpr-main/mpc-node/MPCInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto mpcConfig = ppcConfig->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto mpcService = std::make_shared<MPCService>();
mpcService->setMPCConfig(mpcConfig);
mpcService->setStorageConfig(storageConfig);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-main/pro-node/ProNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(
m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway());
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
Loading

0 comments on commit f89c9bc

Please sign in to comment.