Skip to content

Commit

Permalink
Sync logs from leader to follower and learner (#2067)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

- Sync log to follower and async learner when a new WAL entry is
written.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Oct 19, 2024
1 parent 59e2611 commit 84cf2eb
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 173 deletions.
187 changes: 151 additions & 36 deletions src/main/cluster_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ClusterManager::~ClusterManager() {
other_node_map_.clear();
this_node_.reset();
if (client_to_leader_.get() != nullptr) {
client_to_leader_->UnInit();
client_to_leader_->UnInit(true);
}
client_to_leader_.reset();
}
Expand Down Expand Up @@ -79,7 +79,7 @@ Status ClusterManager::InitAsFollower(const String &node_name, const String &lea
leader_node_->port_ = leader_port;

Status client_status = Status::OK();
std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(leader_ip, leader_port);
std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(node_name, leader_ip, leader_port);
if (!client_status.ok()) {
return client_status;
}
Expand Down Expand Up @@ -110,7 +110,7 @@ Status ClusterManager::InitAsLearner(const String &node_name, const String &lead
leader_node_->port_ = leader_port;

Status client_status = Status::OK();
std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(leader_ip, leader_port);
std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(node_name, leader_ip, leader_port);
if (!client_status.ok()) {
return client_status;
}
Expand All @@ -131,12 +131,12 @@ Status ClusterManager::UnInit() {
}

std::unique_lock<std::mutex> lock(mutex_);
Status status = UnregisterFromLeaderNoLock();
Status status = UnregisterToLeaderNoLock();

other_node_map_.clear();
this_node_.reset();
if (client_to_leader_.get() != nullptr) {
client_to_leader_->UnInit();
client_to_leader_->UnInit(true);
}
client_to_leader_.reset();

Expand Down Expand Up @@ -216,6 +216,9 @@ void ClusterManager::HeartBeatToLeader() {

// Update the non-leader node info from leader
UpdateNodeInfoNoLock(hb_task->other_nodes_);

// Check if leader can't send message to this reader node
this_node_->node_status_ = hb_task->sender_status_;
}
return;
}
Expand Down Expand Up @@ -291,7 +294,7 @@ Status ClusterManager::RegisterToLeaderNoLock() {
return status;
}

Status ClusterManager::UnregisterFromLeaderNoLock() {
Status ClusterManager::UnregisterToLeaderNoLock() {
Status status = Status::OK();
if (this_node_->node_role_ == NodeRole::kFollower or this_node_->node_role_ == NodeRole::kLearner) {
if (leader_node_->node_status_ == NodeStatus::kAlive) {
Expand All @@ -309,24 +312,28 @@ Status ClusterManager::UnregisterFromLeaderNoLock() {
return status;
}

Tuple<SharedPtr<PeerClient>, Status> ClusterManager::ConnectToServerNoLock(const String &server_ip, i64 server_port) {
SharedPtr<PeerClient> client = MakeShared<PeerClient>(server_ip, server_port);
Tuple<SharedPtr<PeerClient>, Status>
ClusterManager::ConnectToServerNoLock(const String &sending_node_name, const String &server_ip, i64 server_port) {
SharedPtr<PeerClient> client = MakeShared<PeerClient>(sending_node_name, server_ip, server_port);
Status client_status = client->Init();
if (!client_status.ok()) {
return {nullptr, client_status};
}
return {client, client_status};
}

Status ClusterManager::SendLogs(const String &node_name, const SharedPtr<PeerClient> &peer_client, const Vector<SharedPtr<String>> &logs, bool synchronize) {
Status
ClusterManager::SendLogs(const String &node_name, const SharedPtr<PeerClient> &peer_client, const Vector<SharedPtr<String>> &logs, bool synchronize) {
SharedPtr<SyncLogTask> sync_log_task = MakeShared<SyncLogTask>(node_name, logs);
peer_client->Send(sync_log_task);

if(synchronize) {
Status status = Status::OK();
if (synchronize) {
sync_log_task->Wait();
} else {
return status;
}

Status status = Status::OK();
if (sync_log_task->error_code_ != 0) {
LOG_ERROR(fmt::format("Fail to send log follower: {}, error message: {}", node_name, sync_log_task->error_message_));
status.code_ = static_cast<ErrorCode>(sync_log_task->error_code_);
Expand All @@ -335,6 +342,28 @@ Status ClusterManager::SendLogs(const String &node_name, const SharedPtr<PeerCli
return status;
}

Status ClusterManager::GetReadersInfo(Vector<SharedPtr<NodeInfo>> &followers,
Vector<SharedPtr<PeerClient>> &follower_clients,
Vector<SharedPtr<NodeInfo>> &learners,
Vector<SharedPtr<PeerClient>> &learner_clients) {
if (this_node_->node_role_ != NodeRole::kLeader) {
return Status::InvalidNodeRole("Expect leader node");
}
std::unique_lock<std::mutex> lock(mutex_);
for (const auto &node_info_pair : other_node_map_) {
if (node_info_pair.second->node_role_ == NodeRole::kFollower && node_info_pair.second->node_status_ == NodeStatus::kAlive) {
followers.emplace_back(node_info_pair.second);
follower_clients.emplace_back(reader_client_map_[node_info_pair.first]);
}
if (node_info_pair.second->node_role_ == NodeRole::kLearner && node_info_pair.second->node_status_ == NodeStatus::kAlive) {
learners.emplace_back(node_info_pair.second);
learner_clients.emplace_back(reader_client_map_[node_info_pair.first]);
}
}

return Status::OK();
}

Status ClusterManager::AddNodeInfo(const SharedPtr<NodeInfo> &node_info) {
// Only used by Leader on follower/learner registration.
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -354,7 +383,8 @@ Status ClusterManager::AddNodeInfo(const SharedPtr<NodeInfo> &node_info) {
}

// Connect to follower/learner server.
auto [client_to_follower, client_status] = ClusterManager::ConnectToServerNoLock(node_info->ip_address_, node_info->port_);
auto [client_to_follower, client_status] =
ClusterManager::ConnectToServerNoLock(this_node_->node_name_, node_info->ip_address_, node_info->port_);
if (!client_status.ok()) {
return client_status;
}
Expand All @@ -366,31 +396,52 @@ Status ClusterManager::AddNodeInfo(const SharedPtr<NodeInfo> &node_info) {
return Status::OK();
}

Status ClusterManager::RemoveNode(const String &node_name) {
Status ClusterManager::UpdateNodeByLeader(const String &node_name, UpdateNodeOp update_node_op) {
// Only used in leader mode.
std::unique_lock<std::mutex> lock(mutex_);

auto iter = other_node_map_.find(node_name);
if (iter == other_node_map_.end()) {
return Status::NotExistNode(fmt::format("Attempt to remove non-exist node: {}", node_name));
} else {
other_node_map_.erase(node_name);
switch (update_node_op) {
case UpdateNodeOp::kRemove: {
other_node_map_.erase(node_name);
break;
}
case UpdateNodeOp::kLostConnection: {
// Can't connect to the node
iter->second->node_status_ = NodeStatus::kLostConnection;
break;
}
}
}

auto client_iter = reader_client_map_.find(node_name);
if(client_iter == reader_client_map_.end()) {
if (client_iter == reader_client_map_.end()) {
return Status::NotExistNode(fmt::format("Attempt to disconnect from non-exist node: {}", node_name));
} else {
client_iter->second->UnInit();
reader_client_map_.erase(node_name);
switch (update_node_op) {
case UpdateNodeOp::kRemove: {
client_iter->second->UnInit(true);
reader_client_map_.erase(node_name);
break;
}
case UpdateNodeOp::kLostConnection: {
client_iter->second->UnInit(false);
reader_client_map_.erase(node_name);
break;
}
}
}

return Status::OK();
}

Status ClusterManager::UpdateNodeInfoByHeartBeat(const SharedPtr<NodeInfo> &non_leader_node,
Vector<infinity_peer_server::NodeInfo> &other_nodes,
i64 &leader_term) {
i64 &leader_term,
infinity_peer_server::NodeStatus::type &sender_status) {
// Used by leader
std::unique_lock<std::mutex> lock(mutex_);
if (this_node_->node_role_ != NodeRole::kLeader) {
Expand All @@ -411,12 +462,32 @@ Status ClusterManager::UpdateNodeInfoByHeartBeat(const SharedPtr<NodeInfo> &non_
other_node->port_);
return Status::NodeInfoUpdated(ToString(other_node->node_role_));
}
// Found the node, just update the timestamp
other_node->txn_timestamp_ = non_leader_node->txn_timestamp_;
auto now = std::chrono::system_clock::now();
auto time_since_epoch = now.time_since_epoch();
other_node->last_update_ts_ = std::chrono::duration_cast<std::chrono::seconds>(time_since_epoch).count();
++other_node->heartbeat_count_;
// Found the node
switch (other_node->node_status_) {
case NodeStatus::kAlive:
case NodeStatus::kTimeout: {
// just update the timestamp
other_node->txn_timestamp_ = non_leader_node->txn_timestamp_;
auto now = std::chrono::system_clock::now();
auto time_since_epoch = now.time_since_epoch();
other_node->last_update_ts_ = std::chrono::duration_cast<std::chrono::seconds>(time_since_epoch).count();
++other_node->heartbeat_count_;
sender_status = infinity_peer_server::NodeStatus::type::kAlive;
break;
}
case NodeStatus::kLostConnection: {
LOG_ERROR(fmt::format("Node {} from {}:{} cant' connected, but still can receive heartbeat.",
other_node->node_name_,
other_node->ip_address_,
other_node->port_));
sender_status = infinity_peer_server::NodeStatus::type::kLostConnection;
break;
}
case NodeStatus::kInvalid: {
UnrecoverableError("Invalid node status");
}
}

non_leader_node_found = true;
} else {
infinity_peer_server::NodeInfo thrift_node_info;
Expand Down Expand Up @@ -481,18 +552,62 @@ Status ClusterManager::SyncLogsOnRegistration(const SharedPtr<NodeInfo> &non_lea
return SendLogs(non_leader_node->node_name_, peer_client, wal_strings, true);
}

Status ClusterManager::SyncLogsToFollower() {
// Used by leader to sync logs when get HB from follower
return Status::OK();
}
void ClusterManager::PrepareLogs(const SharedPtr<String> &log_string) { logs_to_sync_.emplace_back(log_string); }

Status ClusterManager::SyncLogs() {
LOG_TRACE("Sync logs to follower and async logs to learner");
Set<String> sent_nodes;
while (true) {
// Get follower and learner node
Vector<SharedPtr<NodeInfo>> followers;
Vector<SharedPtr<PeerClient>> follower_clients;
Vector<SharedPtr<NodeInfo>> learners;
Vector<SharedPtr<PeerClient>> learner_clients;

Status status = GetReadersInfo(followers, follower_clients, learners, learner_clients);
if (!status.ok()) {
return status;
}

SizeT follower_count = followers.size();
SizeT learner_count = learners.size();

if (follower_count != follower_clients.size() && learner_count != learner_clients.size()) {
return Status::UnexpectedError("Node info and node client count isn't match");
}

Status ClusterManager::AsyncLogsToLearner() {
// Used by leader to async logs when get HB from learner
// Replicate logs to follower
for (SizeT idx = 0; idx < follower_count; ++idx) {
const String &follower_name = followers[idx]->node_name_;
if (!sent_nodes.contains(follower_name)) {
status = SendLogs(follower_name, follower_clients[idx], logs_to_sync_, true);
if (status.ok()) {
sent_nodes.insert(follower_name);
}
}
}

// Replicate logs to learner
for (SizeT idx = 0; idx < learner_count; ++idx) {
const String &learner_name = learners[idx]->node_name_;
if (!sent_nodes.contains(learner_name)) {
status = SendLogs(learner_name, learner_clients[idx], logs_to_sync_, false);
if (status.ok()) {
sent_nodes.insert(learner_name);
}
}
}

if (sent_nodes.size() == follower_count + learner_count) {
logs_to_sync_.clear();
break;
}
}
return Status::OK();
}

Status ClusterManager::SetFollowerNumber(SizeT new_follower_number) {
if(new_follower_number > 5) {
if (new_follower_number > 5) {
return Status::NotSupport("Attempt to set follower count larger than 5.");
}

Expand All @@ -501,9 +616,7 @@ Status ClusterManager::SetFollowerNumber(SizeT new_follower_number) {
return Status::OK();
}

SizeT ClusterManager::GetFollowerNumber() const {
return follower_count_;
}
SizeT ClusterManager::GetFollowerNumber() const { return follower_count_; }

Status ClusterManager::UpdateNodeInfoNoLock(const Vector<SharedPtr<NodeInfo>> &info_of_nodes) {
// Only follower and learner will use this function.
Expand Down Expand Up @@ -542,12 +655,14 @@ Status ClusterManager::UpdateNodeInfoNoLock(const Vector<SharedPtr<NodeInfo>> &i
return Status::OK();
}

Status ClusterManager::ApplySyncedLogNolock(const Vector<String>& synced_logs) {
for(auto& log_str: synced_logs) {
Status ClusterManager::ApplySyncedLogNolock(const Vector<String> &synced_logs) {
// WalManager *wal_manager = txn_manager_->wal_manager();
for (auto &log_str : synced_logs) {
const i32 entry_size = log_str.size();
const char *ptr = log_str.data();
SharedPtr<WalEntry> entry = WalEntry::ReadAdv(ptr, entry_size);
LOG_DEBUG(fmt::format("WAL Entry: {}", entry->ToString()));
// wal_manager->ReplayWalEntry(*entry);
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit 84cf2eb

Please sign in to comment.