From af0f5d9c5e1748ebba550570a70b245a9b09b0ce Mon Sep 17 00:00:00 2001 From: siwuxie Date: Tue, 5 Mar 2024 08:22:29 +0800 Subject: [PATCH 1/2] add test.conf --- bin/test.conf | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 bin/test.conf diff --git a/bin/test.conf b/bin/test.conf new file mode 100644 index 0000000..2fc1ada --- /dev/null +++ b/bin/test.conf @@ -0,0 +1,8 @@ +node0ip=127.0.1.1 +node0port=29016 +node1ip=127.0.1.1 +node1port=29017 +node2ip=127.0.1.1 +node2port=29018 +node1ip=127.0.1.1 +node1port=7788 From 0451958544eea84ae7c00fa68e56124b4f2e966f Mon Sep 17 00:00:00 2001 From: siwuxie Date: Thu, 14 Mar 2024 15:06:20 +0800 Subject: [PATCH 2/2] bug fix --- src/raftCore/include/raft.h | 1 + src/raftCore/raft.cpp | 24 ++++++++-- src/rpc/mprpcchannel.cpp | 13 ++---- src/rpc/rpcprovider.cpp | 93 ++++++++++++++++++------------------- 4 files changed, 71 insertions(+), 60 deletions(-) diff --git a/src/raftCore/include/raft.h b/src/raftCore/include/raft.h index e9fb1c6..11b73d3 100644 --- a/src/raftCore/include/raft.h +++ b/src/raftCore/include/raft.h @@ -95,6 +95,7 @@ class Raft : public raftRpcProctoc::raftRpc { void RequestVote(const raftRpcProctoc::RequestVoteArgs *args, raftRpcProctoc::RequestVoteReply *reply); bool UpToDate(int index, int term); int getLastLogIndex(); + int getLastLogTerm(); void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm); int getLogTermFromLogIndex(int logIndex); int GetRaftStateSize(); diff --git a/src/raftCore/raft.cpp b/src/raftCore/raft.cpp index e025c1e..b227dfd 100644 --- a/src/raftCore/raft.cpp +++ b/src/raftCore/raft.cpp @@ -639,9 +639,9 @@ void Raft::RequestVote(const raftRpcProctoc::RequestVoteArgs* args, raftRpcProct } myAssert(args->term() == m_currentTerm, format("[func--rf{%d}] 前面校验过args.Term==rf.currentTerm,这里却不等", m_me)); - // 现在节点任期都是相同的(任期小的也已经更新到新的args的term了) - // ,要检查log的term和index是不是匹配的了 - int lastLogTerm = getLastLogIndex(); + // 现在节点任期都是相同的(任期小的也已经更新到新的args的term了),还需要检查log的term和index是不是匹配的了 + + int lastLogTerm = getLastLogTerm(); //只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票 if (!UpToDate(args->lastlogindex(), args->lastlogterm())) { // args.LastLogTerm < lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex < lastLogIndex) { @@ -703,7 +703,11 @@ void Raft::getLastLogIndexAndTerm(int* lastLogIndex, int* lastLogTerm) { return; } } - +/** + * + * @return 最新的log的logindex,即log的逻辑index。区别于log在m_logs中的物理index + * 可见:getLastLogIndexAndTerm() + */ int Raft::getLastLogIndex() { int lastLogIndex = -1; int _ = -1; @@ -711,6 +715,18 @@ int Raft::getLastLogIndex() { return lastLogIndex; } +int Raft::getLastLogTerm() { + int _ = -1; + int lastLogTerm = -1; + getLastLogIndexAndTerm(&_, &lastLogTerm); + return lastLogTerm; +} + +/** + * + * @param logIndex log的逻辑index。注意区别于m_logs的物理index + * @return + */ int Raft::getLogTermFromLogIndex(int logIndex) { myAssert(logIndex >= m_lastSnapshotIncludeIndex, format("[func-getSlicesIndexFromLogIndex-rf{%d}] index{%d} < rf.lastSnapshotIncludeIndex{%d}", m_me, diff --git a/src/rpc/mprpcchannel.cpp b/src/rpc/mprpcchannel.cpp index a814ac7..1c86125 100644 --- a/src/rpc/mprpcchannel.cpp +++ b/src/rpc/mprpcchannel.cpp @@ -31,8 +31,8 @@ void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method, } const google::protobuf::ServiceDescriptor* sd = method->service(); - std::string service_name = sd->name(); // service_name - std::string method_name = method->name(); // method_name + std::string service_name = sd->name(); // service_name + std::string method_name = method->name(); // method_name // 获取参数的序列化字符串长度 args_size uint32_t args_size{}; @@ -55,7 +55,7 @@ void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method, } // 使用protobuf的CodedOutputStream来构建发送的数据流 - std::string send_rpc_str; // 用来存储最终发送的数据 + std::string send_rpc_str; // 用来存储最终发送的数据 { // 创建一个StringOutputStream用于写入send_rpc_str google::protobuf::io::StringOutputStream string_output(&send_rpc_str); @@ -151,10 +151,7 @@ bool MprpcChannel::newConnect(const char* ip, uint16_t port, string* errMsg) { return true; } -MprpcChannel::MprpcChannel(string ip, short port, bool connectNow) - : m_ip(ip), - m_port(port), - m_clientFd(-1) { +MprpcChannel::MprpcChannel(string ip, short port, bool connectNow) : m_ip(ip), m_port(port), m_clientFd(-1) { // 使用tcp编程,完成rpc方法的远程调用,使用的是短连接,因此每次都要重新连接上去,待改成长连接。 // 没有连接或者连接已经断开,那么就要重新连接呢,会一直不断地重试 // 读取配置文件rpcserver的信息 @@ -164,7 +161,7 @@ MprpcChannel::MprpcChannel(string ip, short port, bool connectNow) // /UserServiceRpc/Login if (!connectNow) { return; - } //可以允许延迟连接 + } //可以允许延迟连接 std::string errMsg; auto rt = newConnect(ip.c_str(), port, &errMsg); int tryCount = 3; diff --git a/src/rpc/rpcprovider.cpp b/src/rpc/rpcprovider.cpp index 8ae906e..06b0fb6 100644 --- a/src/rpc/rpcprovider.cpp +++ b/src/rpc/rpcprovider.cpp @@ -134,57 +134,54 @@ void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net // 网络上接收的远程rpc调用请求的字符流 Login args std::string recv_buf = buffer->retrieveAllAsString(); - // 使用protobuf的CodedInputStream来解析数据流 - google::protobuf::io::ArrayInputStream array_input(recv_buf.data(), recv_buf.size()); - google::protobuf::io::CodedInputStream coded_input(&array_input); - uint32_t header_size{}; - - coded_input.ReadVarint32(&header_size); // 解析header_size - - // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息 - std::string rpc_header_str; - RPC::RpcHeader rpcHeader; - std::string service_name; - std::string method_name; - - // 设置读取限制,不必担心数据读多 - google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size); - coded_input.ReadString(&rpc_header_str, header_size); - // 恢复之前的限制,以便安全地继续读取其他数据 - coded_input.PopLimit(msg_limit); - uint32_t args_size{}; - if (rpcHeader.ParseFromString(rpc_header_str)) - { - // 数据头反序列化成功 - service_name = rpcHeader.service_name(); - method_name = rpcHeader.method_name(); - args_size = rpcHeader.args_size(); - } - else - { - // 数据头反序列化失败 - std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl; - return; - } + // 使用protobuf的CodedInputStream来解析数据流 + google::protobuf::io::ArrayInputStream array_input(recv_buf.data(), recv_buf.size()); + google::protobuf::io::CodedInputStream coded_input(&array_input); + uint32_t header_size{}; + + coded_input.ReadVarint32(&header_size); // 解析header_size + + // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息 + std::string rpc_header_str; + RPC::RpcHeader rpcHeader; + std::string service_name; + std::string method_name; + + // 设置读取限制,不必担心数据读多 + google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size); + coded_input.ReadString(&rpc_header_str, header_size); + // 恢复之前的限制,以便安全地继续读取其他数据 + coded_input.PopLimit(msg_limit); + uint32_t args_size{}; + if (rpcHeader.ParseFromString(rpc_header_str)) { + // 数据头反序列化成功 + service_name = rpcHeader.service_name(); + method_name = rpcHeader.method_name(); + args_size = rpcHeader.args_size(); + } else { + // 数据头反序列化失败 + std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl; + return; + } - // 获取rpc方法参数的字符流数据 - std::string args_str; - // 直接读取args_size长度的字符串数据 - bool read_args_success = coded_input.ReadString(&args_str, args_size); + // 获取rpc方法参数的字符流数据 + std::string args_str; + // 直接读取args_size长度的字符串数据 + bool read_args_success = coded_input.ReadString(&args_str, args_size); - if (!read_args_success) { - // 处理错误:参数数据读取失败 - return; - } + if (!read_args_success) { + // 处理错误:参数数据读取失败 + return; + } - // 打印调试信息 -// std::cout << "============================================" << std::endl; -// std::cout << "header_size: " << header_size << std::endl; -// std::cout << "rpc_header_str: " << rpc_header_str << std::endl; -// std::cout << "service_name: " << service_name << std::endl; -// std::cout << "method_name: " << method_name << std::endl; -// std::cout << "args_str: " << args_str << std::endl; -// std::cout << "============================================" << std::endl; + // 打印调试信息 + // std::cout << "============================================" << std::endl; + // std::cout << "header_size: " << header_size << std::endl; + // std::cout << "rpc_header_str: " << rpc_header_str << std::endl; + // std::cout << "service_name: " << service_name << std::endl; + // std::cout << "method_name: " << method_name << std::endl; + // std::cout << "args_str: " << args_str << std::endl; + // std::cout << "============================================" << std::endl; // 获取service对象和method对象 auto it = m_serviceMap.find(service_name);