From b03f975138b6100b20bed24ba3e67739e3b88d97 Mon Sep 17 00:00:00 2001 From: ildnyy <2114829840@qq.com> Date: Tue, 20 Feb 2024 13:31:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:RPC=E8=87=AA=E5=AE=9A=E4=B9=89=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E7=9A=84=E5=A4=B4=E9=83=A8=E5=AD=97=E6=AE=B5=E9=95=BF?= =?UTF-8?q?=E5=BA=A6=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/rpc/mprpcchannel.cpp | 31 ++++++++++++++++++------------- src/rpc/rpcprovider.cpp | 25 +++++++++++++++++++++---- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/rpc/mprpcchannel.cpp b/src/rpc/mprpcchannel.cpp index 10a9e41..dc7eec8 100644 --- a/src/rpc/mprpcchannel.cpp +++ b/src/rpc/mprpcchannel.cpp @@ -49,29 +49,34 @@ void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method, return; } - // 定义rpc的请求header RPC::RpcHeader rpcHeader; rpcHeader.set_service_name(service_name); rpcHeader.set_method_name(method_name); rpcHeader.set_args_size(args_size); - uint32_t header_size = 0; std::string rpc_header_str; - if (rpcHeader.SerializeToString(&rpc_header_str)) - { - header_size = rpc_header_str.size(); - } - else - { + if (!rpcHeader.SerializeToString(&rpc_header_str)) { controller->SetFailed("serialize rpc header error!"); return; } - // 组织待发送的rpc请求的字符串 - std::string send_rpc_str; - send_rpc_str.insert(0, std::string((char *)&header_size, 4)); // header_size - send_rpc_str += rpc_header_str; // rpcheader - send_rpc_str += args_str; // args + // 使用protobuf的CodedOutputStream来构建发送的数据流 + std::string send_rpc_str; // 用来存储最终发送的数据 + { + // 创建一个StringOutputStream用于写入send_rpc_str + google::protobuf::io::StringOutputStream string_output(&send_rpc_str); + google::protobuf::io::CodedOutputStream coded_output(&string_output); + + // 先写入header的长度(变长编码) + coded_output.WriteVarint32(static_cast(rpc_header_str.size())); + + // 不需要手动写入header_size,因为上面的WriteVarint32已经包含了header的长度信息 + // 然后写入rpc_header本身 + coded_output.WriteString(rpc_header_str); + } + + // 最后,将请求参数附加到send_rpc_str后面 + send_rpc_str += args_str; // 打印调试信息 // std::cout << "============================================" << std::endl; diff --git a/src/rpc/rpcprovider.cpp b/src/rpc/rpcprovider.cpp index 47fa1d7..8091985 100644 --- a/src/rpc/rpcprovider.cpp +++ b/src/rpc/rpcprovider.cpp @@ -146,15 +146,25 @@ void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, // 网络上接收的远程rpc调用请求的字符流 Login args std::string recv_buf = buffer->retrieveAllAsString(); - // 从字符流中读取前4个字节的内容 + // 使用protobuf的CodedInputStream来解析数据流 + google::protobuf::io::ArrayInputStream array_input(recv_buf.data(), recv_buf.size()); + google::protobuf::io::CodedInputStream coded_input(&array_input); uint32_t header_size = 0; - recv_buf.copy((char *)&header_size, 4, 0); + + coded_input.ReadVarint32(&header_size); // 解析header_size // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息 - std::string rpc_header_str = recv_buf.substr(4, header_size); + std::string rpc_header_str; RPC::RpcHeader rpcHeader; std::string service_name; std::string method_name; + + // 设置读取限制,不必担心数据读多 + google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size); + coded_input.ReadString(&rpc_header_str, header_size); + // 恢复之前的限制,以便安全地继续读取其他数据 + coded_input.PopLimit(msg_limit); + uint32_t args_size; if (rpcHeader.ParseFromString(rpc_header_str)) { @@ -171,7 +181,14 @@ void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, } // 获取rpc方法参数的字符流数据 - std::string args_str = recv_buf.substr(4 + header_size, args_size); + std::string args_str; + // 直接读取args_size长度的字符串数据 + bool read_args_success = coded_input.ReadString(&args_str, args_size); + + if (!read_args_success) { + // 处理错误:参数数据读取失败 + return; + } // 打印调试信息 // std::cout << "============================================" << std::endl;