diff --git a/cpp/cmake/TargetSettings.cmake b/cpp/cmake/TargetSettings.cmake index 73882793..645cf931 100644 --- a/cpp/cmake/TargetSettings.cmake +++ b/cpp/cmake/TargetSettings.cmake @@ -23,9 +23,12 @@ set(TARS_PROTOCOL_TARGET "wedpr-tars-protocol") # wedpr-protocol/protobuf set(PB_PROTOCOL_TARGET "wedpr-pb-protocol") -# wedpr-protocol/grpc-client -set(SERVICE_CLIENT_TARGET "service-client") -set(SERVICE_CLIENT_PB_TARGET "service-client-pb") +# wedpr-protocol/grpc/client +set(SERVICE_CLIENT_TARGET "wedpr-client") +# wedpr-protocol/grpc/server +set(SERVICE_SERVER_TARGET "wedpr-server") +# wedpr-protocol/proto generated file +set(SERVICE_PB_TARGET "service-pb") # ppc-front SET(FRONT_TARGET "ppc-front") diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h index d65171c0..28b2fcd8 100644 --- a/cpp/ppc-framework/gateway/IGateway.h +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -63,10 +63,12 @@ class IGateway virtual void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) = 0; - virtual void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; - virtual void unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0; - virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0; - virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) = 0; + virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; + virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0; + virtual bcos::Error::Ptr registerTopic( + bcos::bytesConstRef nodeID, std::string const& topic) = 0; + virtual bcos::Error::Ptr unRegisterTopic( + bcos::bytesConstRef nodeID, std::string const& topic) = 0; }; } // namespace ppc::gateway diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h index 2d4f8e19..76f460b2 100644 --- a/cpp/ppc-framework/protocol/INodeInfo.h +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -40,6 +40,8 @@ class INodeInfo virtual std::string const& endPoint() const = 0; virtual bcos::bytesConstRef nodeID() const = 0; + virtual void setNodeID(bcos::bytesConstRef nodeID) = 0; + virtual void setEndPoint(std::string const& endPoint) = 0; // components virtual void setComponents(std::set const& components) = 0; diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp index de92b96b..4abad7a4 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -183,20 +183,26 @@ void GatewayImpl::onReceiveBroadcastMessage(MessageFace::Ptr msg, WsSession::Ptr m_localRouter->dispatcherMessage(p2pMessage, nullptr); } -void GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) +bcos::Error::Ptr GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) { m_localRouter->registerNodeInfo(nodeInfo); + return nullptr; } -void GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID) +bcos::Error::Ptr GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID) { m_localRouter->unRegisterNode(nodeID.toBytes()); + return nullptr; } -void GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) + +bcos::Error::Ptr GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) { m_localRouter->registerTopic(nodeID, topic); + return nullptr; } -void GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) + +bcos::Error::Ptr GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) { m_localRouter->unRegisterTopic(nodeID, topic); + return nullptr; } \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h index 40f97fa7..66d692bf 100644 --- a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -61,10 +61,10 @@ class GatewayImpl : public IGateway, public std::enable_shared_from_thisencode(encodedData); receivedMsg.set_data(encodedData.data(), encodedData.size()); - auto grpcCallback = [callback](ClientContext const&, Status const& status, Error&& response) { - auto error = std::make_shared(response.errorcode(), response.errormessage()); - callback(error); - }; - - auto call = std::make_shared(grpcCallback); - call->responseReader = - m_stub->PrepareAsynconReceiveMessage(&call->context, receivedMsg, &m_client->queue()); - call->responseReader->StartCall(); - // send request, upon completion of the RPC, "reply" be updated with the server's response - call->responseReader->Finish(&call->reply, &call->status, (void*)call.get()); + ClientContext context; + auto response = std::make_shared(); + m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(), + [response, callback](Status status) { callback(toError(status, std::move(*response))); }); } \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/FrontClient.h b/cpp/wedpr-protocol/grpc/client/FrontClient.h index 61205f7c..4f40b4bb 100644 --- a/cpp/wedpr-protocol/grpc/client/FrontClient.h +++ b/cpp/wedpr-protocol/grpc/client/FrontClient.h @@ -23,13 +23,11 @@ namespace ppc::protocol { -class FrontClient : public ppc::front::IFrontClient +class FrontClient : public ppc::front::IFrontClient, public GrpcClient { public: using Ptr = std::shared_ptr; - FrontClient(GrpcClient::Ptr client) - : m_client(std::move(client)), m_stub(ppc::proto::Front::NewStub(m_client->channel())) - {} + FrontClient(std::shared_ptr channel) : GrpcClient(std::move(channel)) {} ~FrontClient() override = default; void onReceiveMessage( @@ -37,6 +35,5 @@ class FrontClient : public ppc::front::IFrontClient private: std::unique_ptr m_stub; - GrpcClient::Ptr m_client; }; } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp index bdcb43b9..50a445ae 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.cpp @@ -18,24 +18,59 @@ * @date 2024-09-02 */ #include "GatewayClient.h" +#include "Common.h" +#include "protobuf/RequestConverter.h" using namespace ppc; +using namespace ppc::proto; +using namespace grpc; using namespace ppc::gateway; using namespace ppc::protocol; - -void GatewayClient::start() {} -void GatewayClient::stop() {} - void GatewayClient::asyncSendMessage(RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout, ReceiveMsgFunc callback) -{} +{ + auto request = generateRequest(routeType, routeInfo, std::move(payload), timeout); + ClientContext context; + auto response = std::make_shared(); + m_stub->async()->asyncSendMessage(&context, request.get(), response.get(), + [callback, response](Status status) { callback(toError(status, std::move(*response))); }); +} + + +bcos::Error::Ptr GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) +{ + auto request = toNodeInfoRequest(nodeInfo); + ClientContext context; + std::shared_ptr response = std::make_shared(); + auto status = m_stub->registerNodeInfo(&context, *request, response.get()); + return toError(status, std::move(*response)); +} + +bcos::Error::Ptr GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) +{ + auto request = toNodeInfoRequest(nodeID, ""); + ClientContext context; + std::shared_ptr response = std::make_shared(); + auto status = m_stub->unRegisterNodeInfo(&context, *request, response.get()); + return toError(status, std::move(*response)); +} +bcos::Error::Ptr GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) +{ + auto request = toNodeInfoRequest(nodeID, topic); + ClientContext context; + std::shared_ptr response = std::make_shared(); + auto status = m_stub->registerTopic(&context, *request, response.get()); + return toError(status, std::move(*response)); +} -void GatewayClient::asyncSendbroadcastMessage( - RouteType routeType, MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) -{} -void GatewayClient::registerNodeInfo(INodeInfo::Ptr const& nodeInfo) {} -void GatewayClient::unRegisterNodeInfo(bcos::bytesConstRef nodeID) {} -void GatewayClient::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) {} -void GatewayClient::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) {} \ No newline at end of file +bcos::Error::Ptr GatewayClient::unRegisterTopic( + bcos::bytesConstRef nodeID, std::string const& topic) +{ + auto request = toNodeInfoRequest(nodeID, topic); + ClientContext context; + std::shared_ptr response = std::make_shared(); + auto status = m_stub->unRegisterTopic(&context, *request, response.get()); + return toError(status, std::move(*response)); +} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GatewayClient.h b/cpp/wedpr-protocol/grpc/client/GatewayClient.h index f1e9a012..a6635ccc 100644 --- a/cpp/wedpr-protocol/grpc/client/GatewayClient.h +++ b/cpp/wedpr-protocol/grpc/client/GatewayClient.h @@ -18,20 +18,24 @@ * @date 2024-09-02 */ #pragma once +#include "GrpcClient.h" #include "ppc-framework/gateway/IGateway.h" namespace ppc::protocol { -class GatewayClient : public ppc::gateway::IGateway +class GatewayClient : public ppc::gateway::IGateway, public GrpcClient { public: using Ptr = std::shared_ptr; - GatewayClient() = default; + GatewayClient(std::shared_ptr channel) + : GrpcClient(std::move(channel)), m_stub(ppc::proto::Gateway::NewStub(m_channel)) + {} + ~GatewayClient() override = default; - void start() override; - void stop() override; + void start() override {} + void stop() override {} /** * @brief send message to gateway @@ -52,10 +56,14 @@ class GatewayClient : public ppc::gateway::IGateway long timeout, ppc::protocol::ReceiveMsgFunc callback) override; void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, - ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override; - void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override; - void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override; - void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; - void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; + ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload) override + {} + bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override; + bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) override; + bcos::Error::Ptr registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; + bcos::Error::Ptr unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; + +private: + std::unique_ptr m_stub; }; } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp b/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp deleted file mode 100644 index 019e5c10..00000000 --- a/cpp/wedpr-protocol/grpc/client/GrpcClient.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2023 WeDPR. - * SPDX-License-Identifier: Apache-2.0 - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * @file GrpcClient.cpp - * @author: yujiechen - * @date 2024-09-02 - */ -#include "GrpcClient.h" -#include "Common.h" - - -using namespace ppc::protocol; -using namespace grpc; - -void GrpcClient::handleRpcResponse() -{ - void* callback; - bool ok = false; - // Block until the next result is available in the completion queue "m_queue". - while (m_queue.Next(&callback, &ok)) - { - try - { - // The tag in this example is the memory location of the call object - // Note: the should been managed by shared_ptr - AsyncClientCall* call = static_cast(callback); - - // Verify that the request was completed successfully. Note that "ok" - // corresponds solely to the request for updates introduced by Finish(). - if (!ok) - { - GRPC_CLIENT_LOG(WARNING) - << LOG_DESC("handleRpcResponse: receive response with unormal status"); - return; - } - call->callback(call->context, call->status, std::move(call->reply)); - } - catch (std::exception const& e) - { - GRPC_CLIENT_LOG(WARNING) << LOG_DESC("handleRpcResponse exception") - << LOG_KV("error", boost::diagnostic_information(e)); - } - } -} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/client/GrpcClient.h b/cpp/wedpr-protocol/grpc/client/GrpcClient.h index b00437ac..028718a9 100644 --- a/cpp/wedpr-protocol/grpc/client/GrpcClient.h +++ b/cpp/wedpr-protocol/grpc/client/GrpcClient.h @@ -23,25 +23,7 @@ namespace ppc::protocol { -// struct for keeping state and data information -class AsyncClientCall -{ -public: - using CallbackDef = - std::function; - AsyncClientCall(CallbackDef _callback) : callback(std::move(_callback)) {} - - CallbackDef callback; - // Container for the data we expect from the server. - ppc::proto::Error reply; - // Context for the client. It could be used to convey extra information to - // the server and/or tweak certain RPC behaviors. - grpc::ClientContext context; - // Storage for the status of the RPC upon completion. - grpc::Status status; - std::unique_ptr> responseReader; -}; - +// refer to: https://grpc.io/docs/languages/cpp/callback/ class GrpcClient { public: @@ -51,15 +33,8 @@ class GrpcClient virtual ~GrpcClient() = default; std::shared_ptr const& channel() { return m_channel; } - grpc::CompletionQueue& queue() { return m_queue; } - - void handleRpcResponse(); -private: +protected: std::shared_ptr m_channel; - // The producer-consumer queue we use to communicate asynchronously with the - // gRPC runtime. - // TODO: check threadsafe - grpc::CompletionQueue m_queue; }; } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/CMakeLists.txt b/cpp/wedpr-protocol/grpc/server/CMakeLists.txt new file mode 100644 index 00000000..1b795210 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/CMakeLists.txt @@ -0,0 +1,3 @@ +file(GLOB_RECURSE SRCS *.cpp) +add_library(${SERVICE_SERVER_TARGET} ${SRCS}) +target_link_libraries(${SERVICE_SERVER_TARGET} PUBLIC ${SERVICE_PB_TARGET} ${PB_PROTOCOL_TARGET}) \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/Common.h b/cpp/wedpr-protocol/grpc/server/Common.h new file mode 100644 index 00000000..58fa79e2 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/Common.h @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2021 FISCO BCOS. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Common.h + * @author: yujiechen + * @date 2021-04-12 + */ +#pragma once +#include "ppc-framework/Common.h" + +#define GRPC_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][SERVER]" +#define FRONT_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[FRONT][SERVER]" +#define GATEWAY_SERVER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVER]" \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/FrontServer.cpp b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp new file mode 100644 index 00000000..ee040e6f --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/FrontServer.cpp @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file FrontServer.cpp + * @author: yujiechen + * @date 2024-09-03 + */ +#include "FrontServer.h" +#include "Common.h" +#include "protobuf/RequestConverter.h" +#include + +using namespace ppc::proto; +using namespace ppc::protocol; +using namespace grpc; + +ServerUnaryReactor* FrontServer::onReceiveMessage( + CallbackServerContext* context, const ReceivedMessage* receivedMsg, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + // decode the request + auto msg = m_msgBuilder->build(bcos::bytesConstRef( + (bcos::byte*)receivedMsg->data().data(), receivedMsg->data().size())); + m_front->onReceiveMessage(msg, [reactor, reply](bcos::Error::Ptr error) { + toSerializedError(reply, error); + reactor->Finish(Status::OK); + }); + } + catch (std::exception const& e) + { + FRONT_SERVER_LOG(ERROR) << LOG_DESC("onReceiveMessage exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared(-1, std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/FrontServer.h b/cpp/wedpr-protocol/grpc/server/FrontServer.h new file mode 100644 index 00000000..988c093c --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/FrontServer.h @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file FrontServer.h + * @author: yujiechen + * @date 2024-09-03 + */ +#pragma once +#include "Service.grpc.pb.h" +#include +#include +#include + +namespace ppc::protocol +{ +class FrontServer : public ppc::proto::Front::CallbackService +{ +public: + using Ptr = std::shared_ptr; + FrontServer(ppc::protocol::MessageBuilder::Ptr msgBuilder, ppc::front::IFront::Ptr front) + : m_msgBuilder(std::move(msgBuilder)), m_front(std::move(front)) + {} + ~FrontServer() override = default; + + grpc::ServerUnaryReactor* onReceiveMessage(grpc::CallbackServerContext* context, + const ppc::proto::ReceivedMessage* receivedMsg, ppc::proto::Error* reply) override; + +private: + ppc::front::IFront::Ptr m_front; + ppc::protocol::MessageBuilder::Ptr m_msgBuilder; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp new file mode 100644 index 00000000..91f690c4 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.cpp @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayServer.cpp + * @author: yujiechen + * @date 2024-09-03 + */ +#include "GatewayServer.h" +#include "Common.h" +#include "protobuf/RequestConverter.h" +using namespace ppc::protocol; +using namespace grpc; + +ServerUnaryReactor* GatewayServer::asyncSendMessage(CallbackServerContext* context, + const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + // TODO: optimize here + bcos::bytes payloadData(sendedMsg->payload().begin(), sendedMsg->payload().end()); + auto routeInfo = generateRouteInfo(m_routeInfoBuilder, sendedMsg->routeinfo()); + m_gateway->asyncSendMessage((ppc::protocol::RouteType)sendedMsg->routetype(), routeInfo, + std::move(payloadData), sendedMsg->timeout(), [reactor, reply](bcos::Error::Ptr error) { + toSerializedError(reply, error); + reactor->Finish(Status::OK); + }); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("asyncSendMessage exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared(-1, + "handle message failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + +ServerUnaryReactor* GatewayServer::registerNodeInfo(CallbackServerContext* context, + const ppc::proto::NodeInfo* serializedNodeInfo, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + auto nodeInfo = toNodeInfo(m_nodeInfoFactory, *serializedNodeInfo); + auto result = m_gateway->registerNodeInfo(nodeInfo); + toSerializedError(reply, result); + reactor->Finish(Status::OK); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("registerNodeInfo exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared(-1, + "registerNodeInfo failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + +ServerUnaryReactor* GatewayServer::unRegisterNodeInfo( + CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + auto result = m_gateway->unRegisterNodeInfo( + bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size())); + toSerializedError(reply, result); + reactor->Finish(Status::OK); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterNodeInfo exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared(-1, "unRegisterNodeInfo failed for : " + + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + +ServerUnaryReactor* GatewayServer::registerTopic( + CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + auto result = m_gateway->registerTopic( + bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size()), + nodeInfo->topic()); + toSerializedError(reply, result); + reactor->Finish(Status::OK); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterNodeInfo exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared( + -1, "registerTopic failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} + +ServerUnaryReactor* GatewayServer::unRegisterTopic( + CallbackServerContext* context, const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) +{ + std::shared_ptr reactor(context->DefaultReactor()); + try + { + auto result = m_gateway->unRegisterTopic( + bcos::bytesConstRef((bcos::byte*)nodeInfo->nodeid().data(), nodeInfo->nodeid().size()), + nodeInfo->topic()); + toSerializedError(reply, result); + reactor->Finish(Status::OK); + } + catch (std::exception const& e) + { + GATEWAY_SERVER_LOG(WARNING) << LOG_DESC("unRegisterTopic exception") + << LOG_KV("error", boost::diagnostic_information(e)); + toSerializedError(reply, + std::make_shared(-1, + "unRegisterTopic failed for : " + std::string(boost::diagnostic_information(e)))); + reactor->Finish(Status::OK); + } + return reactor.get(); +} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/GatewayServer.h b/cpp/wedpr-protocol/grpc/server/GatewayServer.h new file mode 100644 index 00000000..d287a6c8 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/GatewayServer.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayServer.h + * @author: yujiechen + * @date 2024-09-03 + */ +#pragma once +#include "Service.grpc.pb.h" +#include "ppc-framework/gateway/IGateway.h" +#include + +namespace ppc::protocol +{ +class GatewayServer : public ppc::proto::Gateway::CallbackService +{ +public: + using Ptr = std::shared_ptr; + GatewayServer(ppc::gateway::IGateway::Ptr gateway, + MessageOptionalHeaderBuilder::Ptr routeInfoBuilder, INodeInfoFactory::Ptr nodeInfoFactory) + : m_gateway(std::move(gateway)), + m_routeInfoBuilder(std::move(routeInfoBuilder)), + m_nodeInfoFactory(std::move(nodeInfoFactory)) + {} + virtual ~GatewayServer() = default; + + grpc::ServerUnaryReactor* asyncSendMessage(grpc::CallbackServerContext* context, + const ppc::proto::SendedMessageRequest* sendedMsg, ppc::proto::Error* reply) override; + + grpc::ServerUnaryReactor* registerNodeInfo(grpc::CallbackServerContext* context, + const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override; + + grpc::ServerUnaryReactor* unRegisterNodeInfo(grpc::CallbackServerContext* context, + const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override; + + grpc::ServerUnaryReactor* registerTopic(grpc::CallbackServerContext* context, + const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override; + + grpc::ServerUnaryReactor* unRegisterTopic(grpc::CallbackServerContext* context, + const ppc::proto::NodeInfo* nodeInfo, ppc::proto::Error* reply) override; + +private: + ppc::gateway::IGateway::Ptr m_gateway; + MessageOptionalHeaderBuilder::Ptr m_routeInfoBuilder; + INodeInfoFactory::Ptr m_nodeInfoFactory; +}; +}; // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp b/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp new file mode 100644 index 00000000..9d612dfa --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GrpcServer.cpp + * @author: yujiechen + * @date 2024-09-03 + */ +#include "GrpcServer.h" +#include "Common.h" +#include + +using namespace ppc::protocol; +using namespace grpc; + +void GrpcServer::start() +{ + if (m_running) + { + GRPC_SERVER_LOG(INFO) << LOG_DESC("GrpcServer has already been started!") + << LOG_KV("endPoint", m_config.endPoint()); + return; + } + m_running = true; + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + grpc::ServerBuilder builder; + // without authentication + builder.AddListeningPort(m_config.endPoint(), grpc::InsecureServerCredentials()); + // register the service + for (auto const& service : m_bindingServices) + { + builder.RegisterService(service.get()); + } + m_server = std::unique_ptr(builder.BuildAndStart()); + GRPC_SERVER_LOG(INFO) << LOG_DESC("GrpcServer start success!") + << LOG_KV("endPoint", m_config.endPoint()); +} + +void GrpcServer::stop() +{ + if (!m_running) + { + GRPC_SERVER_LOG(INFO) << LOG_DESC("GrpcServer has already been stopped!"); + return; + } + m_running = false; + if (m_server) + { + m_server->Shutdown(); + } + GRPC_SERVER_LOG(INFO) << LOG_DESC("GrpcServer stop success!"); +} \ No newline at end of file diff --git a/cpp/wedpr-protocol/grpc/server/GrpcServer.h b/cpp/wedpr-protocol/grpc/server/GrpcServer.h new file mode 100644 index 00000000..869dcc88 --- /dev/null +++ b/cpp/wedpr-protocol/grpc/server/GrpcServer.h @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GrpcServer.h + * @author: yujiechen + * @date 2024-09-03 + */ +#pragma once +#include +#include +#include + +namespace ppc::protocol +{ +// refer to: https://grpc.io/docs/languages/cpp/callback/ +struct GrpcServerConfig +{ + std::string listenIp; + int listenPort; + + std::string endPoint() const { return listenIp + ":" + std::to_string(listenPort); } +}; +class GrpcServer +{ +public: + using Ptr = std::shared_ptr; + GrpcServer(GrpcServerConfig const& config) : m_config(config) {} + virtual ~GrpcServer() = default; + + virtual void start(); + virtual void stop(); + + virtual void registerService(std::shared_ptr service) + { + m_bindingServices.emplace_back(std::move(service)); + } + +private: + bool m_running = false; + GrpcServerConfig m_config; + + std::unique_ptr m_server; + std::vector> m_bindingServices; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/wedpr-protocol/proto/pb/Service.proto b/cpp/wedpr-protocol/proto/pb/Service.proto index b41fd3c4..eb241af8 100644 --- a/cpp/wedpr-protocol/proto/pb/Service.proto +++ b/cpp/wedpr-protocol/proto/pb/Service.proto @@ -15,10 +15,6 @@ message ReceivedMessage{ bytes data = 1; }; -service Front { - rpc onReceiveMessage (ReceivedMessage) returns (Error) {} -} - message RouteInfo{ string topic = 1; string componentType = 2; @@ -33,7 +29,9 @@ message SendedMessageRequest{ bytes payload = 3; int64 timeout = 4; }; - +service Front { + rpc onReceiveMessage (ReceivedMessage) returns (Error) {} +} service Gateway{ rpc asyncSendMessage(SendedMessageRequest) returns(Error){} rpc registerNodeInfo(NodeInfo) returns(Error){} diff --git a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h b/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h index 1a5a4000..b0164a58 100644 --- a/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h +++ b/cpp/wedpr-protocol/protobuf/NodeInfoImpl.h @@ -43,6 +43,12 @@ class NodeInfoImpl : public INodeInfo } ~NodeInfoImpl() override = default; + void setNodeID(bcos::bytesConstRef nodeID) override + { + m_inner()->set_nodeid(nodeID.data(), nodeID.size()); + } + void setEndPoint(std::string const& endPoint) override { m_inner()->set_endpoint(endPoint); } + void setComponents(std::set const& components) override { m_components = components; diff --git a/cpp/wedpr-protocol/protobuf/RequestConverter.h b/cpp/wedpr-protocol/protobuf/RequestConverter.h new file mode 100644 index 00000000..faefc75d --- /dev/null +++ b/cpp/wedpr-protocol/protobuf/RequestConverter.h @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2021 FISCO BCOS. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file Common.h + * @author: yujiechen + * @date 2021-04-12 + */ +#pragma once +#include "Service.pb.h" +#include "ppc-framework/protocol/INodeInfo.h" +#include "ppc-framework/protocol/Message.h" +#include "ppc-framework/protocol/Protocol.h" +#include +#include +#include + +namespace ppc::protocol +{ +inline MessageOptionalHeader::Ptr generateRouteInfo( + MessageOptionalHeaderBuilder::Ptr const& routeInfoBuilder, + ppc::proto::RouteInfo const& serializedRouteInfo) +{ + auto routeInfo = routeInfoBuilder->build(); + routeInfo->setComponentType(serializedRouteInfo.componenttype()); + routeInfo->setSrcNode( + bcos::bytes(serializedRouteInfo.srcnode().begin(), serializedRouteInfo.srcnode().end())); + routeInfo->setDstNode( + bcos::bytes(serializedRouteInfo.dstnode().begin(), serializedRouteInfo.dstnode().end())); + routeInfo->setDstInst(serializedRouteInfo.dstinst()); + return routeInfo; +} + +inline std::shared_ptr generateRequest(RouteType routeType, + MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, long timeout) +{ + auto request = std::make_shared(); + request->set_routetype(uint16_t(routeType)); + // set the route information + request->mutable_routeinfo()->set_topic(routeInfo->topic()); + request->mutable_routeinfo()->set_componenttype(routeInfo->componentType()); + request->mutable_routeinfo()->set_srcnode( + routeInfo->srcNode().data(), routeInfo->srcNode().size()); + request->mutable_routeinfo()->set_dstnode( + routeInfo->dstNode().data(), routeInfo->dstNode().size()); + request->mutable_routeinfo()->set_dstinst( + routeInfo->dstInst().data(), routeInfo->dstInst().size()); + // set the payload(TODO: optimize here) + request->set_payload(payload.data(), payload.size()); + request->set_timeout(timeout); + return request; +} + +inline std::shared_ptr toNodeInfoRequest( + bcos::bytesConstRef const& nodeID, std::string const& topic) +{ + auto request = std::make_shared(); + request->set_nodeid(nodeID.data(), nodeID.size()); + request->set_topic(topic); + return request; +} + +inline std::shared_ptr toNodeInfoRequest(INodeInfo::Ptr const& nodeInfo) +{ + auto request = std::make_shared(); + if (!nodeInfo) + { + return request; + }; + request->set_nodeid(nodeInfo->nodeID().data(), nodeInfo->nodeID().size()); + request->set_endpoint(nodeInfo->endPoint()); + auto const& components = nodeInfo->components(); + for (auto const& component : components) + { + request->add_components(component); + } + return request; +} + +inline INodeInfo::Ptr toNodeInfo( + INodeInfoFactory::Ptr const& nodeInfoFactory, ppc::proto::NodeInfo const& serializedNodeInfo) +{ + auto nodeInfo = nodeInfoFactory->build(); + nodeInfo->setNodeID(bcos::bytesConstRef( + (bcos::byte*)serializedNodeInfo.nodeid().data(), serializedNodeInfo.nodeid().size())); + nodeInfo->setEndPoint(serializedNodeInfo.endpoint()); + std::set componentTypeList; + for (int i = 0; i < serializedNodeInfo.components_size(); i++) + { + componentTypeList.insert(serializedNodeInfo.components(i)); + } + nodeInfo->setComponents(componentTypeList); + return nodeInfo; +} + +inline bcos::Error::Ptr toError(grpc::Status const& status, ppc::proto::Error&& error) +{ + if (!status.ok()) + { + return std::make_shared((int32_t)status.error_code(), status.error_message()); + } + if (error.errorcode() == 0) + { + return nullptr; + } + return std::make_shared(error.errorcode(), error.errormessage()); +} + +inline void toSerializedError(ppc::proto::Error* serializedError, bcos::Error::Ptr error) +{ + if (!serializedError) + { + return; + } + if (!error) + { + serializedError->set_errorcode(PPCRetCode::SUCCESS); + return; + } + serializedError->set_errorcode(error->errorCode()); + serializedError->set_errormessage(error->errorMessage()); +} +}; // namespace ppc::protocol \ No newline at end of file