Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature milestone2 #26

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once
#include "FrontConfig.h"
#include "ppc-framework/protocol/Handler.h"
#include "ppc-framework/protocol/INodeInfo.h"
#include "ppc-framework/protocol/Message.h"
#include "ppc-framework/protocol/RouteType.h"
Expand Down Expand Up @@ -123,7 +124,6 @@ class IFront : virtual public IFrontClient
* @param topic the topic to unregister
*/
virtual void unRegisterTopic(std::string const& topic) = 0;

};

class IFrontBuilder
Expand All @@ -133,6 +133,7 @@ class IFrontBuilder
IFrontBuilder() = default;
virtual ~IFrontBuilder() = default;

virtual IFrontClient::Ptr buildClient(std::string endPoint) const = 0;
virtual IFrontClient::Ptr buildClient(std::string endPoint,
std::function<void()> onUnHealthHandler, bool removeHandlerOnUnhealth) const = 0;
};
} // namespace ppc::front
27 changes: 23 additions & 4 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,26 @@

namespace ppc::protocol
{
struct GrpcServerConfig
class GrpcServerConfig
{
ppc::protocol::EndPoint endPoint;
public:
using Ptr = std::shared_ptr<GrpcServerConfig>;
GrpcServerConfig() = default;
GrpcServerConfig(EndPoint endPoint, bool enableHealthCheck)
: m_endPoint(std::move(endPoint)), m_enableHealthCheck(enableHealthCheck)
{}
std::string listenEndPoint() const { return m_endPoint.listenEndPoint(); }

void setEndPoint(EndPoint endPoint) { m_endPoint = endPoint; }
void setEnableHealthCheck(bool enableHealthCheck) { m_enableHealthCheck = enableHealthCheck; }

std::string listenEndPoint() const { return endPoint.listenEndPoint(); }
EndPoint const& endPoint() const { return m_endPoint; }
EndPoint& mutableEndPoint() { return m_endPoint; }
bool enableHealthCheck() const { return m_enableHealthCheck; }

protected:
ppc::protocol::EndPoint m_endPoint;
bool m_enableHealthCheck = false;
};
class GrpcConfig
{
Expand All @@ -43,7 +58,11 @@ class GrpcConfig
m_loadBalancePolicy = loadBalancePolicy;
}

private:
bool enableHealthCheck() const { return m_enableHealthCheck; }
void setEnableHealthCheck(bool enableHealthCheck) { m_enableHealthCheck = enableHealthCheck; }

protected:
bool m_enableHealthCheck = false;
std::string m_loadBalancePolicy = "round_robin";
};
} // namespace ppc::protocol
41 changes: 41 additions & 0 deletions cpp/ppc-framework/protocol/Handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file EndPoint.h
* @author: yujiechen
* @date 2024-08-22
*/

#pragma once
#include <memory>
#include <string>
#include <vector>

namespace ppc::protocol
{
struct HealthCheckHandler
{
using Ptr = std::shared_ptr<HealthCheckHandler>;
HealthCheckHandler(std::string const& _serviceName) : serviceName(_serviceName) {}

std::string serviceName;
// handler used to check the health
std::function<bool()> checkHealthHandler;
// handler called when the service un-health
std::function<void()> onUnHealthHandler;
// remove the handler when the service un-health
bool removeHandlerOnUnhealth = true;
};
} // namespace ppc::protocol
3 changes: 2 additions & 1 deletion cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ void PPCConfig::loadGatewayConfig(boost::property_tree::ptree const& _pt)
// load the grpcConfig
m_grpcConfig = loadGrpcConfig("transport", _pt);
// load the GrpcServerConfig
loadEndpointConfig(m_gatewayConfig.grpcServerConfig.endPoint, false, "transport", _pt);
loadEndpointConfig(
m_gatewayConfig.grpcServerConfig->mutableEndPoint(), false, "transport", _pt);
// the agencyID
m_agencyID = _pt.get<std::string>("agency.id", "");
if (m_agencyID.empty())
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ struct GatewayConfig
constexpr static int MinUnreachableDistance = 2;

NetworkConfig networkConfig;
ppc::protocol::GrpcServerConfig grpcServerConfig;
ppc::protocol::GrpcServerConfig::Ptr grpcServerConfig =
std::make_shared<ppc::protocol::GrpcServerConfig>();
// the file that configure the connected endpoint information
std::string nodeFileName;
// the dir that contains the connected endpoint information, e.g.nodes.json
Expand Down
16 changes: 14 additions & 2 deletions cpp/wedpr-main/gateway/GatewayInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ppc-gateway/GatewayFactory.h"
#include "ppc-tools/src/config/PPCConfig.h"
#include "protobuf/src/NodeInfoImpl.h"
#include "wedpr-protocol/grpc/client/HealthCheckTimer.h"
#include "wedpr-protocol/grpc/client/RemoteFrontBuilder.h"
#include "wedpr-protocol/protocol/src/v1/MessageHeaderImpl.h"

Expand Down Expand Up @@ -50,7 +51,10 @@ void GatewayInitializer::init(std::string const& _configPath)
"gateway", config->gatewayConfig().networkConfig.threadPoolSize);

GatewayFactory gatewayFactory(config);
m_gateway = gatewayFactory.build(std::make_shared<RemoteFrontBuilder>(config->grpcConfig()));
// default 1min
m_healthChecker = std::make_shared<HealthCheckTimer>(60 * 1000);
m_gateway = gatewayFactory.build(
std::make_shared<RemoteFrontBuilder>(config->grpcConfig(), m_healthChecker));

m_server = std::make_shared<GrpcServer>(config->gatewayConfig().grpcServerConfig);
// register the gateway service
Expand All @@ -69,15 +73,23 @@ void GatewayInitializer::start()
{
m_server->start();
}
if (m_healthChecker)
{
m_healthChecker->start();
}
}
void GatewayInitializer::stop()
{
if (m_healthChecker)
{
m_healthChecker->stop();
}
if (m_server)
{
m_server->stop();
}
if (m_gateway)
{
m_gateway->start();
m_gateway->stop();
}
}
5 changes: 5 additions & 0 deletions cpp/wedpr-main/gateway/GatewayInitializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ namespace ppc::protocol
{
class GrpcServer;
}
namespace ppc::protocol
{
class HealthCheckTimer;
};
namespace ppc::gateway
{
class GatewayInitializer
Expand All @@ -44,6 +48,7 @@ class GatewayInitializer
protected:
bcos::BoostLogInitializer::Ptr m_logInitializer;
ppc::gateway::IGateway::Ptr m_gateway;
std::shared_ptr<ppc::protocol::HealthCheckTimer> m_healthChecker;
std::shared_ptr<ppc::protocol::GrpcServer> m_server;
};
} // namespace ppc::gateway
6 changes: 6 additions & 0 deletions cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
return args;
}
args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
if (grpcConfig->enableHealthCheck())
{
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"\"}}");
}
return args;
}
} // namespace ppc::protocol
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/grpc/client/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
#include "ppc-framework/Common.h"

#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
#define HEALTH_LOG(LEVEL) BCOS_LOG(LEVEL) << "[HEALTH]"
56 changes: 36 additions & 20 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,52 @@ void GatewayClient::asyncSendMessage(RouteType routeType,

bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
broadCast([nodeInfo](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
});
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
broadCast([nodeID](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
});
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerTopic(&context, *request, response.get());
return toError(status, std::move(*response));
broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->registerTopic(&context, *request, response.get());
return toError(status, std::move(*response));
});
}

bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterTopic(&context, *request, response.get());
return toError(status, std::move(*response));
broadCast([nodeID, topic](ChannelInfo const& channel) {
std::unique_ptr<ppc::proto::Gateway::Stub> stub(
ppc::proto::Gateway::NewStub(channel.channel));
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = stub->unRegisterTopic(&context, *request, response.get());
return toError(status, std::move(*response));
});
}
101 changes: 101 additions & 0 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GrpcClient.cpp
* @author: yujiechen
* @date 2024-09-02
*/
#include "GrpcClient.h"
#include "Common.h"

using namespace ppc::protocol;
using namespace ppc::proto;
using namespace grpc;
using namespace grpc::health::v1;

GrpcClient::GrpcClient(
ppc::protocol::GrpcConfig::Ptr const& grpcConfig, std::string const& endPoints)
: m_grpcConfig(grpcConfig),
m_channel(grpc::CreateCustomChannel(
endPoints, grpc::InsecureChannelCredentials(), toChannelConfig(grpcConfig))),
m_healthCheckStub(grpc::health::v1::Health::NewStub(m_channel))
{
std::vector<std::string> endPointList;
boost::split(endPointList, endPoints, boost::is_any_of(","));
// create the broadcast channels
for (auto const& endPoint : endPointList)
{
GRPC_CLIENT_LOG(INFO) << LOG_DESC("create broacast-channel, endpoint: ") << endPoint;
m_broadcastChannels.push_back(
{endPoint, grpc::CreateCustomChannel(endPoint, grpc::InsecureChannelCredentials(),
toChannelConfig(grpcConfig))});
}
}

bool GrpcClient::checkHealth()
{
try
{
ClientContext context;
HealthCheckResponse response;
auto status =
m_healthCheckStub->Check(&context, HealthCheckRequest::default_instance(), &response);
if (!status.ok())
{
GRPC_CLIENT_LOG(WARNING)
<< LOG_DESC("GrpcClient check health failed") << LOG_KV("code", status.error_code())
<< LOG_KV("msg", status.error_message());
return false;
}
return true;
}
catch (std::exception const& e)
{
GRPC_CLIENT_LOG(WARNING) << LOG_DESC("GrpcClient check health exception")
<< LOG_KV("error", boost::diagnostic_information(e));
return false;
}
}

bcos::Error::Ptr GrpcClient::broadCast(
std::function<bcos::Error::Ptr(ChannelInfo const& channel)> callback)
{
auto result = std::make_shared<bcos::Error>(0, "");
for (auto const& channel : m_broadcastChannels)
{
try
{
if (channel.channel->GetState(false) == GRPC_CHANNEL_SHUTDOWN)
{
GRPC_CLIENT_LOG(INFO) << LOG_DESC("Ignore unconnected channel")
<< LOG_KV("endpoint", channel.endPoint);
continue;
}
auto error = callback(channel);
if (error && error->errorCode() != 0)
{
result->setErrorCode(error->errorCode());
result->setErrorMessage(result->errorMessage() + error->errorMessage() + "; ");
}
}
catch (std::exception const& e)
{
GRPC_CLIENT_LOG(WARNING)
<< LOG_DESC("registerNodeInfo exception") << LOG_KV("remote", channel.endPoint)
<< LOG_KV("error", boost::diagnostic_information(e));
}
}
return result;
}
Loading
Loading