Skip to content

Commit

Permalink
add grpc server implementation (#15)
Browse files Browse the repository at this point in the history
* implement GatewayClient

* update client implement

* add grpc server implementation
  • Loading branch information
cyjseagull authored Sep 4, 2024
1 parent 55da761 commit 58a61ac
Show file tree
Hide file tree
Showing 25 changed files with 700 additions and 150 deletions.
9 changes: 6 additions & 3 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ set(TARS_PROTOCOL_TARGET "wedpr-tars-protocol")
# wedpr-protocol/protobuf
set(PB_PROTOCOL_TARGET "wedpr-pb-protocol")

# wedpr-protocol/grpc-client
set(SERVICE_CLIENT_TARGET "service-client")
set(SERVICE_CLIENT_PB_TARGET "service-client-pb")
# wedpr-protocol/grpc/client
set(SERVICE_CLIENT_TARGET "wedpr-client")
# wedpr-protocol/grpc/server
set(SERVICE_SERVER_TARGET "wedpr-server")
# wedpr-protocol/proto generated file
set(SERVICE_PB_TARGET "service-pb")

# ppc-front
SET(FRONT_TARGET "ppc-front")
Expand Down
10 changes: 6 additions & 4 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ class IGateway

virtual void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) = 0;
virtual void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual void unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual bcos::Error::Ptr registerTopic(
bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual bcos::Error::Ptr unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic) = 0;
};

} // namespace ppc::gateway
2 changes: 2 additions & 0 deletions cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class INodeInfo

virtual std::string const& endPoint() const = 0;
virtual bcos::bytesConstRef nodeID() const = 0;
virtual void setNodeID(bcos::bytesConstRef nodeID) = 0;
virtual void setEndPoint(std::string const& endPoint) = 0;

// components
virtual void setComponents(std::set<std::string> const& components) = 0;
Expand Down
14 changes: 10 additions & 4 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,26 @@ void GatewayImpl::onReceiveBroadcastMessage(MessageFace::Ptr msg, WsSession::Ptr
m_localRouter->dispatcherMessage(p2pMessage, nullptr);
}

void GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo)
bcos::Error::Ptr GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo)
{
m_localRouter->registerNodeInfo(nodeInfo);
return nullptr;
}

void GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
bcos::Error::Ptr GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
m_localRouter->unRegisterNode(nodeID.toBytes());
return nullptr;
}
void GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)

bcos::Error::Ptr GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
m_localRouter->registerTopic(nodeID, topic);
return nullptr;
}
void GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic)

bcos::Error::Ptr GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
m_localRouter->unRegisterTopic(nodeID, topic);
return nullptr;
}
8 changes: 4 additions & 4 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this<Gateway
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override;


void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;

protected:
virtual void onReceiveP2PMessage(
Expand Down
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ foreach(proto_file ${MESSAGES_PROTOS})
)
endforeach()

add_library(${SERVICE_CLIENT_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_CLIENT_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)
add_library(${SERVICE_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)

add_subdirectory(client)
add_subdirectory(client)
add_subdirectory(server)
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
file(GLOB_RECURSE SRCS *.cpp)
add_library(${SERVICE_CLIENT_TARGET} ${SRCS})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_CLIENT_PB_TARGET} ${PB_PROTOCOL_TARGET})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_PB_TARGET} ${PB_PROTOCOL_TARGET})
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/grpc/client/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
#pragma once
#include "ppc-framework/Common.h"

#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
22 changes: 6 additions & 16 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
* @date 2024-09-02
*/
#include "FrontClient.h"
#include "protobuf/RequestConverter.h"
#include "wedpr-protocol/protobuf/Common.h"


using namespace ppc::protocol;
using namespace ppc::proto;
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using namespace grpc;

void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback)
{
Expand All @@ -37,15 +34,8 @@ void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, Recei
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());

auto grpcCallback = [callback](ClientContext const&, Status const& status, Error&& response) {
auto error = std::make_shared<bcos::Error>(response.errorcode(), response.errormessage());
callback(error);
};

auto call = std::make_shared<AsyncClientCall>(grpcCallback);
call->responseReader =
m_stub->PrepareAsynconReceiveMessage(&call->context, receivedMsg, &m_client->queue());
call->responseReader->StartCall();
// send request, upon completion of the RPC, "reply" be updated with the server's response
call->responseReader->Finish(&call->reply, &call->status, (void*)call.get());
ClientContext context;
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, std::move(*response))); });
}
7 changes: 2 additions & 5 deletions cpp/wedpr-protocol/grpc/client/FrontClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@

namespace ppc::protocol
{
class FrontClient : public ppc::front::IFrontClient
class FrontClient : public ppc::front::IFrontClient, public GrpcClient
{
public:
using Ptr = std::shared_ptr<FrontClient>;
FrontClient(GrpcClient::Ptr client)
: m_client(std::move(client)), m_stub(ppc::proto::Front::NewStub(m_client->channel()))
{}
FrontClient(std::shared_ptr<grpc::Channel> channel) : GrpcClient(std::move(channel)) {}

~FrontClient() override = default;
void onReceiveMessage(
ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) override;

private:
std::unique_ptr<ppc::proto::Front::Stub> m_stub;
GrpcClient::Ptr m_client;
};
} // namespace ppc::protocol
59 changes: 47 additions & 12 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,59 @@
* @date 2024-09-02
*/
#include "GatewayClient.h"
#include "Common.h"
#include "protobuf/RequestConverter.h"

using namespace ppc;
using namespace ppc::proto;
using namespace grpc;
using namespace ppc::gateway;
using namespace ppc::protocol;


void GatewayClient::start() {}
void GatewayClient::stop() {}

void GatewayClient::asyncSendMessage(RouteType routeType,
MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout,
ReceiveMsgFunc callback)
{}
{
auto request = generateRequest(routeType, routeInfo, std::move(payload), timeout);
ClientContext context;
auto response = std::make_shared<Error>();
m_stub->async()->asyncSendMessage(&context, request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, std::move(*response))); });
}


bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerTopic(&context, *request, response.get());
return toError(status, std::move(*response));
}

void GatewayClient::asyncSendbroadcastMessage(
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload)
{}
void GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) {}
void GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) {}
void GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) {}
void GatewayClient::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) {}
bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterTopic(&context, *request, response.get());
return toError(status, std::move(*response));
}
26 changes: 17 additions & 9 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
* @date 2024-09-02
*/
#pragma once
#include "GrpcClient.h"
#include "ppc-framework/gateway/IGateway.h"

namespace ppc::protocol
{
class GatewayClient : public ppc::gateway::IGateway
class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
{
public:
using Ptr = std::shared_ptr<GatewayClient>;
GatewayClient() = default;
GatewayClient(std::shared_ptr<grpc::Channel> channel)
: GrpcClient(std::move(channel)), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{}

~GatewayClient() override = default;


void start() override;
void stop() override;
void start() override {}
void stop() override {}

/**
* @brief send message to gateway
Expand All @@ -52,10 +56,14 @@ class GatewayClient : public ppc::gateway::IGateway
long timeout, ppc::protocol::ReceiveMsgFunc callback) override;

void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override;
void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override
{}
bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;

private:
std::unique_ptr<ppc::proto::Gateway::Stub> m_stub;
};
} // namespace ppc::protocol
56 changes: 0 additions & 56 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp

This file was deleted.

Loading

0 comments on commit 58a61ac

Please sign in to comment.