Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

新增raft的getLastLogTerm函数,并修改了相关的小bug #50

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bin/test.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
node0ip=127.0.1.1
node0port=29016
node1ip=127.0.1.1
node1port=29017
node2ip=127.0.1.1
node2port=29018
node1ip=127.0.1.1
node1port=7788
1 change: 1 addition & 0 deletions src/raftCore/include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Raft : public raftRpcProctoc::raftRpc {
void RequestVote(const raftRpcProctoc::RequestVoteArgs *args, raftRpcProctoc::RequestVoteReply *reply);
bool UpToDate(int index, int term);
int getLastLogIndex();
int getLastLogTerm();
void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);
int getLogTermFromLogIndex(int logIndex);
int GetRaftStateSize();
Expand Down
24 changes: 20 additions & 4 deletions src/raftCore/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,9 @@ void Raft::RequestVote(const raftRpcProctoc::RequestVoteArgs* args, raftRpcProct
}
myAssert(args->term() == m_currentTerm,
format("[func--rf{%d}] 前面校验过args.Term==rf.currentTerm,这里却不等", m_me));
// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)
// ,要检查log的term和index是不是匹配的了
int lastLogTerm = getLastLogIndex();
// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了),还需要检查log的term和index是不是匹配的了

int lastLogTerm = getLastLogTerm();
//只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票
if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {
// args.LastLogTerm < lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex < lastLogIndex) {
Expand Down Expand Up @@ -703,14 +703,30 @@ void Raft::getLastLogIndexAndTerm(int* lastLogIndex, int* lastLogTerm) {
return;
}
}

/**
*
* @return 最新的log的logindex,即log的逻辑index。区别于log在m_logs中的物理index
* 可见:getLastLogIndexAndTerm()
*/
int Raft::getLastLogIndex() {
int lastLogIndex = -1;
int _ = -1;
getLastLogIndexAndTerm(&lastLogIndex, &_);
return lastLogIndex;
}

int Raft::getLastLogTerm() {
int _ = -1;
int lastLogTerm = -1;
getLastLogIndexAndTerm(&_, &lastLogTerm);
return lastLogTerm;
}

/**
*
* @param logIndex log的逻辑index。注意区别于m_logs的物理index
* @return
*/
int Raft::getLogTermFromLogIndex(int logIndex) {
myAssert(logIndex >= m_lastSnapshotIncludeIndex,
format("[func-getSlicesIndexFromLogIndex-rf{%d}] index{%d} < rf.lastSnapshotIncludeIndex{%d}", m_me,
Expand Down
13 changes: 5 additions & 8 deletions src/rpc/mprpcchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
}

const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name

// 获取参数的序列化字符串长度 args_size
uint32_t args_size{};
Expand All @@ -55,7 +55,7 @@ void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
}

// 使用protobuf的CodedOutputStream来构建发送的数据流
std::string send_rpc_str; // 用来存储最终发送的数据
std::string send_rpc_str; // 用来存储最终发送的数据
{
// 创建一个StringOutputStream用于写入send_rpc_str
google::protobuf::io::StringOutputStream string_output(&send_rpc_str);
Expand Down Expand Up @@ -151,10 +151,7 @@ bool MprpcChannel::newConnect(const char* ip, uint16_t port, string* errMsg) {
return true;
}

MprpcChannel::MprpcChannel(string ip, short port, bool connectNow)
: m_ip(ip),
m_port(port),
m_clientFd(-1) {
MprpcChannel::MprpcChannel(string ip, short port, bool connectNow) : m_ip(ip), m_port(port), m_clientFd(-1) {
// 使用tcp编程,完成rpc方法的远程调用,使用的是短连接,因此每次都要重新连接上去,待改成长连接。
// 没有连接或者连接已经断开,那么就要重新连接呢,会一直不断地重试
// 读取配置文件rpcserver的信息
Expand All @@ -164,7 +161,7 @@ MprpcChannel::MprpcChannel(string ip, short port, bool connectNow)
// /UserServiceRpc/Login
if (!connectNow) {
return;
} //可以允许延迟连接
} //可以允许延迟连接
std::string errMsg;
auto rt = newConnect(ip.c_str(), port, &errMsg);
int tryCount = 3;
Expand Down
93 changes: 45 additions & 48 deletions src/rpc/rpcprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,57 +134,54 @@ void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net
// 网络上接收的远程rpc调用请求的字符流 Login args
std::string recv_buf = buffer->retrieveAllAsString();

// 使用protobuf的CodedInputStream来解析数据流
google::protobuf::io::ArrayInputStream array_input(recv_buf.data(), recv_buf.size());
google::protobuf::io::CodedInputStream coded_input(&array_input);
uint32_t header_size{};

coded_input.ReadVarint32(&header_size); // 解析header_size

// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str;
RPC::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;

// 设置读取限制,不必担心数据读多
google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size);
coded_input.ReadString(&rpc_header_str, header_size);
// 恢复之前的限制,以便安全地继续读取其他数据
coded_input.PopLimit(msg_limit);
uint32_t args_size{};
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
// 使用protobuf的CodedInputStream来解析数据流
google::protobuf::io::ArrayInputStream array_input(recv_buf.data(), recv_buf.size());
google::protobuf::io::CodedInputStream coded_input(&array_input);
uint32_t header_size{};

coded_input.ReadVarint32(&header_size); // 解析header_size

// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str;
RPC::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;

// 设置读取限制,不必担心数据读多
google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size);
coded_input.ReadString(&rpc_header_str, header_size);
// 恢复之前的限制,以便安全地继续读取其他数据
coded_input.PopLimit(msg_limit);
uint32_t args_size{};
if (rpcHeader.ParseFromString(rpc_header_str)) {
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
} else {
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}

// 获取rpc方法参数的字符流数据
std::string args_str;
// 直接读取args_size长度的字符串数据
bool read_args_success = coded_input.ReadString(&args_str, args_size);
// 获取rpc方法参数的字符流数据
std::string args_str;
// 直接读取args_size长度的字符串数据
bool read_args_success = coded_input.ReadString(&args_str, args_size);

if (!read_args_success) {
// 处理错误:参数数据读取失败
return;
}
if (!read_args_success) {
// 处理错误:参数数据读取失败
return;
}

// 打印调试信息
// std::cout << "============================================" << std::endl;
// std::cout << "header_size: " << header_size << std::endl;
// std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
// std::cout << "service_name: " << service_name << std::endl;
// std::cout << "method_name: " << method_name << std::endl;
// std::cout << "args_str: " << args_str << std::endl;
// std::cout << "============================================" << std::endl;
// 打印调试信息
// std::cout << "============================================" << std::endl;
// std::cout << "header_size: " << header_size << std::endl;
// std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
// std::cout << "service_name: " << service_name << std::endl;
// std::cout << "method_name: " << method_name << std::endl;
// std::cout << "args_str: " << args_str << std::endl;
// std::cout << "============================================" << std::endl;

// 获取service对象和method对象
auto it = m_serviceMap.find(service_name);
Expand Down