Skip to content

Commit

Permalink
add grpc server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 3, 2024
1 parent dcd5e57 commit 8733974
Show file tree
Hide file tree
Showing 16 changed files with 521 additions and 17 deletions.
9 changes: 6 additions & 3 deletions cpp/cmake/TargetSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ set(TARS_PROTOCOL_TARGET "wedpr-tars-protocol")
# wedpr-protocol/protobuf
set(PB_PROTOCOL_TARGET "wedpr-pb-protocol")

# wedpr-protocol/grpc-client
set(SERVICE_CLIENT_TARGET "service-client")
set(SERVICE_CLIENT_PB_TARGET "service-client-pb")
# wedpr-protocol/grpc/client
set(SERVICE_CLIENT_TARGET "wedpr-client")
# wedpr-protocol/grpc/server
set(SERVICE_SERVER_TARGET "wedpr-server")
# wedpr-protocol/proto generated file
set(SERVICE_PB_TARGET "service-pb")

# ppc-front
SET(FRONT_TARGET "ppc-front")
Expand Down
2 changes: 2 additions & 0 deletions cpp/ppc-framework/protocol/INodeInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class INodeInfo

virtual std::string const& endPoint() const = 0;
virtual bcos::bytesConstRef nodeID() const = 0;
virtual void setNodeID(bcos::bytesConstRef nodeID) = 0;
virtual void setEndPoint(std::string const& endPoint) = 0;

// components
virtual void setComponents(std::set<std::string> const& components) = 0;
Expand Down
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ foreach(proto_file ${MESSAGES_PROTOS})
)
endforeach()

add_library(${SERVICE_CLIENT_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_CLIENT_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)
add_library(${SERVICE_PB_TARGET} ${GRPC_MESSAGES_SRCS})
target_link_libraries(${SERVICE_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure)

add_subdirectory(client)
add_subdirectory(client)
add_subdirectory(server)
2 changes: 1 addition & 1 deletion cpp/wedpr-protocol/grpc/client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
file(GLOB_RECURSE SRCS *.cpp)
add_library(${SERVICE_CLIENT_TARGET} ${SRCS})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_CLIENT_PB_TARGET} ${PB_PROTOCOL_TARGET})
target_link_libraries(${SERVICE_CLIENT_TARGET} PUBLIC ${SERVICE_PB_TARGET} ${PB_PROTOCOL_TARGET})
1 change: 1 addition & 0 deletions cpp/wedpr-protocol/grpc/client/GrpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace ppc::protocol
{
// refer to: https://grpc.io/docs/languages/cpp/callback/
class GrpcClient
{
public:
Expand Down
3 changes: 3 additions & 0 deletions cpp/wedpr-protocol/grpc/server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
file(GLOB_RECURSE SRCS *.cpp)
add_library(${SERVICE_SERVER_TARGET} ${SRCS})
target_link_libraries(${SERVICE_SERVER_TARGET} PUBLIC ${SERVICE_PB_TARGET} ${PB_PROTOCOL_TARGET})
25 changes: 25 additions & 0 deletions cpp/wedpr-protocol/grpc/server/Common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (C) 2021 FISCO BCOS.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file Common.h
* @author: yujiechen
* @date 2021-04-12
*/
#pragma once
#include "ppc-framework/Common.h"

#define GRPC_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][SERVER]"
#define FRONT_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[FRONT][SERVER]"
#define GATEWAY_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVER]"
52 changes: 52 additions & 0 deletions cpp/wedpr-protocol/grpc/server/FrontServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file FrontServer.cpp
* @author: yujiechen
* @date 2024-09-03
*/
#include "FrontServer.h"
#include "Common.h"
#include "protobuf/RequestConverter.h"
#include <bcos-utilities/Common.h>

using namespace ppc::proto;
using namespace ppc::protocol;
using namespace grpc;

ServerUnaryReactor* FrontServer::onReceiveMessage(
CallbackServerContext* context, const ReceivedMessage* receivedMsg, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
// decode the request
auto msg = m_msgBuilder->build(bcos::bytesConstRef(
(bcos::byte*)receivedMsg->data().data(), receivedMsg->data().size()));
m_front->onReceiveMessage(msg, [reactor, reply](bcos::Error::Ptr error) {
toSerializedError(reply, error);
reactor->Finish(Status::OK);
});
}
catch (std::exception const& e)
{
FRONT_SERVER_LOG(ERROR) << LOG_DESC("onReceiveMessage exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1, std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}
44 changes: 44 additions & 0 deletions cpp/wedpr-protocol/grpc/server/FrontServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file FrontServer.h
* @author: yujiechen
* @date 2024-09-03
*/
#pragma once
#include "Service.grpc.pb.h"
#include <ppc-framework/front/IFront.h>
#include <ppc-framework/protocol/Message.h>
#include <memory>

namespace ppc::protocol
{
class FrontServer : public ppc::proto::Front::CallbackService
{
public:
using Ptr = std::shared_ptr<FrontServer>;
FrontServer(ppc::protocol::MessageBuilder::Ptr msgBuilder, ppc::front::IFront::Ptr front)
: m_msgBuilder(std::move(msgBuilder)), m_front(std::move(front))
{}
~FrontServer() override = default;

grpc::ServerUnaryReactor* onReceiveMessage(grpc::CallbackServerContext* context,
const ppc::proto::ReceivedMessage* receivedMsg, ppc::proto::Error* reply) override;

private:
ppc::front::IFront::Ptr m_front;
ppc::protocol::MessageBuilder::Ptr m_msgBuilder;
};
} // namespace ppc::protocol
145 changes: 145 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayServer.cpp
* @author: yujiechen
* @date 2024-09-03
*/
#include "GatewayServer.h"
#include "Common.h"
#include "protobuf/RequestConverter.h"
using namespace ppc::protocol;
using namespace grpc;

ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* context,
const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
// TODO: optimize here
bcos::bytes payloadData(sendedMsg->payload().begin(), sendedMsg->payload().end());
auto routeInfo = generateRouteInfo(m_routeInfoBuilder, sendedMsg->routeinfo());
m_gateway->asyncSendMessage((ppc::protocol::RouteType)sendedMsg->routetype(), routeInfo,
std::move(payloadData), sendedMsg->timeout(), [reactor, reply](bcos::Error::Ptr error) {
toSerializedError(reply, error);
reactor->Finish(Status::OK);
});
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncSendMessage exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1,
"handle message failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}

ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* context,
const ppc::proto::NodeInfo* serializedNodeInfo, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
auto nodeInfo = toNodeInfo(m_nodeInfoFactory, *serializedNodeInfo);
auto result = m_gateway->registerNodeInfo(nodeInfo);
toSerializedError(reply, result);
reactor->Finish(Status::OK);
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("registerNodeInfo exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1,
"registerNodeInfo failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}

ServerUnaryReactor* GatewayServer::unRegisterNodeInfo(
CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
auto result = m_gateway->unRegisterNodeInfo(
bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size()));
toSerializedError(reply, result);
reactor->Finish(Status::OK);
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterNodeInfo exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1, "unRegisterNodeInfo failed for : " +
std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}

ServerUnaryReactor* GatewayServer::registerTopic(
CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
auto result = m_gateway->registerTopic(
bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size()),
nodeInfo->topic());
toSerializedError(reply, result);
reactor->Finish(Status::OK);
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterNodeInfo exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(
-1, "registerTopic failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}

ServerUnaryReactor* GatewayServer::unRegisterTopic(
CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply)
{
std::shared_ptr<ServerUnaryReactor> reactor(context->DefaultReactor());
try
{
auto result = m_gateway->unRegisterTopic(
bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size()),
nodeInfo->topic());
toSerializedError(reply, result);
reactor->Finish(Status::OK);
}
catch (std::exception const& e)
{
GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterTopic exception")
<< LOG_KV("error", boost::diagnostic_information(e));
toSerializedError(reply,
std::make_shared<bcos::Error>(-1,
"unRegisterTopic failed for : " + std::string(boost::diagnostic_information(e))));
reactor->Finish(Status::OK);
}
return reactor.get();
}
59 changes: 59 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GatewayServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (C) 2023 WeDPR.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file GatewayServer.h
* @author: yujiechen
* @date 2024-09-03
*/
#pragma once
#include "Service.grpc.pb.h"
#include "ppc-framework/gateway/IGateway.h"
#include <memory>

namespace ppc::protocol
{
class GatewayServer : public ppc::proto::Gateway::CallbackService
{
public:
using Ptr = std::shared_ptr<GatewayServer>;
GatewayServer(ppc::gateway::IGateway::Ptr gateway,
MessageOptionalHeaderBuilder::Ptr routeInfoBuilder, INodeInfoFactory::Ptr nodeInfoFactory)
: m_gateway(std::move(gateway)),
m_routeInfoBuilder(std::move(routeInfoBuilder)),
m_nodeInfoFactory(std::move(nodeInfoFactory))
{}
virtual ~GatewayServer() = default;

grpc::ServerUnaryReactor* asyncSendMessage(grpc::CallbackServerContext* context,
const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* unRegisterNodeInfo(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* registerTopic(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;

grpc::ServerUnaryReactor* unRegisterTopic(grpc::CallbackServerContext* context,
const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override;

private:
ppc::gateway::IGateway::Ptr m_gateway;
MessageOptionalHeaderBuilder::Ptr m_routeInfoBuilder;
INodeInfoFactory::Ptr m_nodeInfoFactory;
};
}; // namespace ppc::protocol
Loading

0 comments on commit 8733974

Please sign in to comment.