Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add asyncGetPeers and asyncGetAgencies #27

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/cmake/BuildInfo.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function(create_build_info)
# Generate header file containing useful build information
add_custom_target(BuildInfo.h ALL
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}"
COMMAND ${CMAKE_COMMAND} -DPPC_SOURCE_DIR="${PROJECT_SOURCE_DIR}/.."
-DPPC_BUILDINFO_IN="${CMAKE_CURRENT_SOURCE_DIR}/cmake/templates/BuildInfo.h.in"
-DPPC_DST_DIR="${PROJECT_BINARY_DIR}/include"
-DPPC_CMAKE_DIR="${CMAKE_CURRENT_SOURCE_DIR}/cmake"
Expand Down
22 changes: 0 additions & 22 deletions cpp/ppc-framework/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,6 @@ DERIVE_PPC_EXCEPTION(DataSchemaNotSetted);
DERIVE_PPC_EXCEPTION(UnsupportedDataSchema);
DERIVE_PPC_EXCEPTION(WeDPRException);

constexpr static int MAX_PORT = 65535;
constexpr static int DEFAULT_SECURITY_PARAM = 128;

constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26;

inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
{
if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH)
{
return p2pId;
}
return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC);
}

template <typename T>
inline std::string printNodeID(T const& nodeID)
{
return std::string(nodeID.begin(), nodeID.begin() + 8);
}

#if ENABLE_CPU_FEATURES
#if X86
static const cpu_features::X86Features CPU_FEATURES = cpu_features::GetX86Info().features;
Expand Down
48 changes: 48 additions & 0 deletions cpp/ppc-framework/Helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2022 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file Common.h
* @author: yujiechen
* @date 2022-10-20
*/
#pragma once
#include <string>
#include <sstream>
namespace ppc
{
constexpr static int MAX_PORT = 65535;
constexpr static int DEFAULT_SECURITY_PARAM = 128;

constexpr static size_t RSA_PUBLIC_KEY_PREFIX = 18;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC = 8;
constexpr static size_t RSA_PUBLIC_KEY_TRUNC_LENGTH = 26;

inline std::string_view printP2PIDElegantly(std::string_view p2pId) noexcept
{
if (p2pId.length() < RSA_PUBLIC_KEY_TRUNC_LENGTH)
{
return p2pId;
}
return p2pId.substr(RSA_PUBLIC_KEY_PREFIX, RSA_PUBLIC_KEY_TRUNC);
}


template <typename T>
inline std::string_view printNodeID(T const& nodeID)
{
size_t offset = nodeID.size() >= 8 ? 8 : nodeID.size();
return std::string_view((const char*)nodeID.data(), offset);
}
} // namespace ppc
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/FrontInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class FrontInterface

virtual std::string const& selfEndPoint() const { return m_selfEndPoint; }

virtual std::vector<std::string> agencies() const = 0;
virtual void start() = 0;
virtual void stop() = 0;

protected:
// the selfEndPoint for the air-mode-node can be localhost
std::string m_selfEndPoint = "localhost";
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class IFront : virtual public IFrontClient
virtual void registerTopicHandler(
std::string const& topic, ppc::protocol::MessageDispatcherCallback callback) = 0;

virtual void registerMessageHandler(
std::string const& componentType, ppc::protocol::MessageDispatcherCallback callback) = 0;
/**
* @brief async send message
*
Expand Down Expand Up @@ -99,6 +101,8 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

/**
* @brief register the nodeInfo to the gateway
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class IGateway
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, std::string const& traceID,
bcos::bytes&& payload) = 0;

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual bcos::Error::Ptr registerTopic(
Expand Down
5 changes: 4 additions & 1 deletion cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
* @date 2024-08-26
*/
#pragma once
#include "ppc-framework/Common.h"
#include "ppc-framework/Helper.h"
#include <bcos-utilities/Common.h>
#include <json/json.h>
#include <memory>
#include <set>
#include <sstream>
Expand Down Expand Up @@ -58,6 +59,8 @@ class INodeInfo
{
return (nodeID() == info->nodeID()) && (components() == info->components());
}

virtual void toJson(Json::Value& jsonObject) const = 0;
};
class INodeInfoFactory
{
Expand Down
10 changes: 5 additions & 5 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* @date 2024-08-22
*/
#pragma once
#include "../Common.h"
#include "MessagePayload.h"
#include "RouteType.h"
#include "ppc-framework/Helper.h"
#include <bcos-boostssl/interfaces/MessageFace.h>
#include <bcos-utilities/Common.h>
#include <bcos-utilities/DataConvertUtility.h>
Expand Down Expand Up @@ -249,10 +249,10 @@ inline std::string printOptionalField(MessageOptionalHeader::Ptr optionalHeader)
std::ostringstream stringstream;
stringstream << LOG_KV("topic", optionalHeader->topic())
<< LOG_KV("componentType", optionalHeader->componentType())
<< LOG_KV("srcNode", *(bcos::toHexString(optionalHeader->srcNode())))
<< LOG_KV("dstNode", *(bcos::toHexString(optionalHeader->dstNode())))
<< LOG_KV("dstInst", optionalHeader->dstInst());

<< LOG_KV("srcNode", printNodeID(optionalHeader->srcNode()))
<< LOG_KV("dstNode", printNodeID(optionalHeader->dstNode()))
<< LOG_KV("srcInst", printNodeID(optionalHeader->srcInst()))
<< LOG_KV("dstInst", printNodeID(optionalHeader->dstInst()));
return stringstream.str();
}

Expand Down
1 change: 0 additions & 1 deletion cpp/ppc-framework/protocol/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ inline std::ostream& operator<<(std::ostream& _out, HashImplName const& _type)

enum class MessageType : uint16_t
{
GatewayMessage = 0x0000,
RpcRequest = 0x1000, // the rpc request type
};

Expand Down
1 change: 1 addition & 0 deletions cpp/ppc-framework/rpc/RpcTypeDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class RpcError : int32_t
std::string const RUN_TASK_METHOD = "runTask";
std::string const ASYNC_RUN_TASK_METHOD = "asyncRunTask";
std::string const GET_TASK_STATUS = "getTaskStatus";
std::string const GET_PEERS = "getPeers";

std::string const ASYNC_RUN_BS_MODE_TASK = "asyncRunBsModeTask";
std::string const FETCH_CIPHER = "fetchCipher";
Expand Down
19 changes: 19 additions & 0 deletions cpp/test-utils/FakeFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ class FakeFront : public FrontInterface
}
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}

std::vector<std::string> agencies() const override
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

void start() override {}
void stop() override {}

private:
// the uuid to _callback
Expand Down Expand Up @@ -231,5 +246,9 @@ class FakeFront : public FrontInterface
std::map<std::string, CallbackFunc> m_uuidToCallback;
bcos::Mutex m_mutex;
std::atomic<int64_t> m_uuid = 0;

// the agency list, for task-sync
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::test
18 changes: 1 addition & 17 deletions cpp/wedpr-computing/ppc-psi/src/PSIConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,7 @@ class PSIConfig
int taskExpireTime() const { return m_taskExpireTime; }
void setTaskExpireTime(int _taskExpireTime) { m_taskExpireTime = _taskExpireTime; }

std::vector<std::string> agencyList() const
{
bcos::ReadGuard l(x_agencyList);
return m_agencyList;
}

// for ut
void setAgencyList(std::vector<std::string> const& agencyList)
{
bcos::WriteGuard l(x_agencyList);
m_agencyList = agencyList;
}
std::vector<std::string> agencyList() const { return m_front->agencies(); }

protected:
ppc::front::PPCMessageFace::Ptr generatePPCMsg(
Expand Down Expand Up @@ -160,10 +149,5 @@ class PSIConfig

// the task-expire time
int m_taskExpireTime = 10000;

// the agency list, for task-sync
// TODO: fetch from the gateway
std::vector<std::string> m_agencyList;
mutable bcos::SharedMutex x_agencyList;
};
} // namespace ppc::psi
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ void testEcdhImplFunc(int64_t _dataBatchSize, std::string const& _serverPSIDataS
auto clientPSI = factory->createEcdhPSI(clientAgencyName, clientConfig);

std::vector<std::string> agencyList = {serverAgencyName, clientAgencyName};
serverPSI->psiConfig()->setAgencyList(agencyList);
clientPSI->psiConfig()->setAgencyList(agencyList);
auto serverFront = std::dynamic_pointer_cast<FakeFront>(serverPSI->psiConfig()->front());
serverFront->setAgencyList(agencyList);

auto clientFront = std::dynamic_pointer_cast<FakeFront>(clientPSI->psiConfig()->front());
clientFront->setAgencyList(agencyList);

// register the server-psi into the front
factory->front()->registerEcdhPSI(serverAgencyName, serverPSI);
Expand Down
13 changes: 12 additions & 1 deletion cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
* @date 2024-08-23
*/
#pragma once

#include "ppc-framework/Common.h"
#include <boost/asio/detail/socket_ops.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <random>
#include <sstream>
#include <string>

namespace ppc
{
Expand Down Expand Up @@ -52,4 +53,14 @@ inline std::string generateUUID()
static thread_local auto uuid_gen = boost::uuids::basic_random_generator<std::random_device>();
return boost::uuids::to_string(uuid_gen());
}
template <typename T>
inline std::string printVector(std::vector<T> const& list)
{
std::stringstream oss;
for (auto const& it : list)
{
oss << it << ",";
}
return oss.str();
}
} // namespace ppc
12 changes: 6 additions & 6 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
m_protocolInitializer = std::make_shared<ProtocolInitializer>();
m_protocolInitializer->init(m_config);

auto ppcMessageFactory = std::make_shared<PPCMessageFactory>();
// init the frontService
INIT_LOG(INFO) << LOG_DESC("init the frontService") << LOG_KV("agency", m_config->agencyID());
auto frontThreadPool = std::make_shared<bcos::ThreadPool>("front", m_config->threadPoolSize());
Expand All @@ -89,7 +88,8 @@ void Initializer::init(ppc::gateway::IGateway::Ptr const& gateway)
{
m_transport = transportBuilder.build(SDKMode::PRO, m_config->frontConfig(), nullptr);
}
m_ppcFront = std::make_shared<Front>(m_transport->getFront());
m_ppcFront =
std::make_shared<Front>(std::make_shared<PPCMessageFactory>(), m_transport->getFront());

INIT_LOG(INFO) << LOG_DESC("init the frontService success")
<< LOG_KV("frontDetail", printFrontDesc(m_config->frontConfig()))
Expand Down Expand Up @@ -413,9 +413,9 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc)

void Initializer::start()
{
if (m_transport)
if (m_ppcFront)
{
m_transport->start();
m_ppcFront->start();
}
/*if (m_ecdhConnPSI)
{
Expand Down Expand Up @@ -456,9 +456,9 @@ void Initializer::start()
void Initializer::stop()
{
// stop the network firstly
if (m_transport)
if (m_ppcFront)
{
m_transport->stop();
m_ppcFront->stop();
}
/*if (m_ecdhConnPSI)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/air-node/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void AirNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config(), m_gateway);
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/cem-node/CEMInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CEMInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto cemConfig = ppcConfig->cemConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto cemService = std::make_shared<CEMService>();
cemService->setCEMConfig(cemConfig);
cemService->setStorageConfig(storageConfig);
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-main/common/NodeStarter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ int startProgram(
}
printVersion();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The " + binaryName + "is running..." << std::endl;
std::cout << "The " + binaryName + " is running..." << std::endl;
while (!exitHandler.shouldExit())
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
starter.reset();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "The" + binaryName + " program exit normally." << std::endl;
std::cout << "The " + binaryName + " program exit normally." << std::endl;
}
} // namespace ppc::node
2 changes: 1 addition & 1 deletion cpp/wedpr-main/mpc-node/MPCInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MPCInitializer::init(std::string const& _configPath)
auto storageConfig = ppcConfig->storageConfig();
auto mpcConfig = ppcConfig->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig);
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto mpcService = std::make_shared<MPCService>();
mpcService->setMPCConfig(mpcConfig);
mpcService->setStorageConfig(storageConfig);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-main/pro-node/ProNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void ProNodeInitializer::init(std::string const& _configPath)


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
m_rpc = rpcFactory->buildRpc(m_nodeInitializer->config());
m_rpc = rpcFactory->buildRpc(
m_nodeInitializer->config(), m_nodeInitializer->transport()->gateway());
m_rpc->setRpcStorage(rpcStatusInterface);
m_rpc->setBsEcdhPSI(m_nodeInitializer->bsEcdhPsi());
m_nodeInitializer->registerRpcHandler(m_rpc);
Expand Down
Loading
Loading