Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add grpc server implementation #15

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ set(TARS_PROTOCOL_TARGET "wedpr-tars-protocol")
# wedpr-protocol/protobuf
set(PB_PROTOCOL_TARGET "wedpr-pb-protocol")

# wedpr-protocol/grpc-client
set(SERVICE_CLIENT_TARGET "service-client")
set(SERVICE_CLIENT_PB_TARGET "service-client-pb")
# wedpr-protocol/grpc/client
set(SERVICE_CLIENT_TARGET "wedpr-client")
# wedpr-protocol/grpc/server
set(SERVICE_SERVER_TARGET "wedpr-server")
# wedpr-protocol/proto generated file
set(SERVICE_PB_TARGET "service-pb")

# ppc-front
SET(FRONT_TARGET "ppc-front")
Expand Down
10 changes: 6 additions & 4 deletions cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ class IGateway

virtual void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) = 0;
virtual void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual void unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
virtual bcos::Error::Ptr registerTopic(
bcos::bytesConstRef nodeID, std::string const& topic) = 0;
virtual bcos::Error::Ptr unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic) = 0;
};

} // namespace ppc::gateway
2 changes: 2 additions & 0 deletions cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class INodeInfo

virtual std::string const& endPoint() const = 0;
virtual bcos::bytesConstRef nodeID() const = 0;
virtual void setNodeID(bcos::bytesConstRef nodeID) = 0;
virtual void setEndPoint(std::string const& endPoint) = 0;

// components
virtual void setComponents(std::set<std::string> const& components) = 0;
Expand Down
14 changes: 10 additions & 4 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,26 @@ void GatewayImpl::onReceiveBroadcastMessage(MessageFace::Ptr msg, WsSession::Ptr
m_localRouter->dispatcherMessage(p2pMessage, nullptr);
}

void GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo)
bcos::Error::Ptr GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo)
{
m_localRouter->registerNodeInfo(nodeInfo);
return nullptr;
}

void GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
bcos::Error::Ptr GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
m_localRouter->unRegisterNode(nodeID.toBytes());
return nullptr;
}
void GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)

bcos::Error::Ptr GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
m_localRouter->registerTopic(nodeID, topic);
return nullptr;
}
void GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic)

bcos::Error::Ptr GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
m_localRouter->unRegisterTopic(nodeID, topic);
return nullptr;
}
8 changes: 4 additions & 4 deletions cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_this<Gateway
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override;


void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;

protected:
virtual void onReceiveP2PMessage(
Expand Down
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ foreach(proto_file ${MESSAGES_PROTOS})
)
endforeach()

add_library(${SERVICE_CLIENT_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_CLIENT_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)
add_library(${SERVICE_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)

add_subdirectory(client)
add_subdirectory(client)
add_subdirectory(server)
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
file(GLOB_RECURSE SRCS *.cpp)
add_library(${SERVICE_CLIENT_TARGET} ${SRCS})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_CLIENT_PB_TARGET} ${PB_PROTOCOL_TARGET})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_PB_TARGET} ${PB_PROTOCOL_TARGET})
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/grpc/client/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
#pragma once
#include "ppc-framework/Common.h"

#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
22 changes: 6 additions & 16 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
* @date 2024-09-02
*/
#include "FrontClient.h"
#include "protobuf/RequestConverter.h"
#include "wedpr-protocol/protobuf/Common.h"


using namespace ppc::protocol;
using namespace ppc::proto;
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using namespace grpc;

void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback)
{
Expand All @@ -37,15 +34,8 @@ void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, Recei
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());

auto grpcCallback = [callback](ClientContext const&, Status const& status, Error&& response) {
auto error = std::make_shared<bcos::Error>(response.errorcode(), response.errormessage());
callback(error);
};

auto call = std::make_shared<AsyncClientCall>(grpcCallback);
call->responseReader =
m_stub->PrepareAsynconReceiveMessage(&call->context, receivedMsg, &m_client->queue());
call->responseReader->StartCall();
// send request, upon completion of the RPC, "reply" be updated with the server's response
call->responseReader->Finish(&call->reply, &call->status, (void*)call.get());
ClientContext context;
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, std::move(*response))); });
}
7 changes: 2 additions & 5 deletions cpp/wedpr-protocol/grpc/client/FrontClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@

namespace ppc::protocol
{
class FrontClient : public ppc::front::IFrontClient
class FrontClient : public ppc::front::IFrontClient, public GrpcClient
{
public:
using Ptr = std::shared_ptr<FrontClient>;
FrontClient(GrpcClient::Ptr client)
: m_client(std::move(client)), m_stub(ppc::proto::Front::NewStub(m_client->channel()))
{}
FrontClient(std::shared_ptr<grpc::Channel> channel) : GrpcClient(std::move(channel)) {}

~FrontClient() override = default;
void onReceiveMessage(
ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) override;

private:
std::unique_ptr<ppc::proto::Front::Stub> m_stub;
GrpcClient::Ptr m_client;
};
} // namespace ppc::protocol
59 changes: 47 additions & 12 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,59 @@
* @date 2024-09-02
*/
#include "GatewayClient.h"
#include "Common.h"
#include "protobuf/RequestConverter.h"

using namespace ppc;
using namespace ppc::proto;
using namespace grpc;
using namespace ppc::gateway;
using namespace ppc::protocol;


void GatewayClient::start() {}
void GatewayClient::stop() {}

void GatewayClient::asyncSendMessage(RouteType routeType,
MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout,
ReceiveMsgFunc callback)
{}
{
auto request = generateRequest(routeType, routeInfo, std::move(payload), timeout);
ClientContext context;
auto response = std::make_shared<Error>();
m_stub->async()->asyncSendMessage(&context, request.get(), response.get(),
[callback, response](Status status) { callback(toError(status, std::move(*response))); });
}


bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo)
{
auto request = toNodeInfoRequest(nodeInfo);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
}

bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID)
{
auto request = toNodeInfoRequest(nodeID, "");
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterNodeInfo(&context, *request, response.get());
return toError(status, std::move(*response));
}
bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->registerTopic(&context, *request, response.get());
return toError(status, std::move(*response));
}

void GatewayClient::asyncSendbroadcastMessage(
RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload)
{}
void GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) {}
void GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) {}
void GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) {}
void GatewayClient::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) {}
bcos::Error::Ptr GatewayClient::unRegisterTopic(
bcos::bytesConstRef nodeID, std::string const& topic)
{
auto request = toNodeInfoRequest(nodeID, topic);
ClientContext context;
std::shared_ptr<ppc::proto::Error> response = std::make_shared<ppc::proto::Error>();
auto status = m_stub->unRegisterTopic(&context, *request, response.get());
return toError(status, std::move(*response));
}
26 changes: 17 additions & 9 deletions cpp/wedpr-protocol/grpc/client/GatewayClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
* @date 2024-09-02
*/
#pragma once
#include "GrpcClient.h"
#include "ppc-framework/gateway/IGateway.h"

namespace ppc::protocol
{
class GatewayClient : public ppc::gateway::IGateway
class GatewayClient : public ppc::gateway::IGateway, public GrpcClient
{
public:
using Ptr = std::shared_ptr<GatewayClient>;
GatewayClient() = default;
GatewayClient(std::shared_ptr<grpc::Channel> channel)
: GrpcClient(std::move(channel)), m_stub(ppc::proto::Gateway::NewStub(m_channel))
{}

~GatewayClient() override = default;


void start() override;
void stop() override;
void start() override {}
void stop() override {}

/**
* @brief send message to gateway
Expand All @@ -52,10 +56,14 @@ class GatewayClient : public ppc::gateway::IGateway
long timeout, ppc::protocol::ReceiveMsgFunc callback) override;

void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override;
void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override
{}
bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override;
bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override;
bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;
bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override;

private:
std::unique_ptr<ppc::proto::Gateway::Stub> m_stub;
};
} // namespace ppc::protocol
56 changes: 0 additions & 56 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.cpp

This file was deleted.

Loading
Loading