From 84cf2eb5702b7448cf1d6a8b5adc5c4a0f001f08 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sat, 19 Oct 2024 23:09:31 +0800 Subject: [PATCH] Sync logs from leader to follower and learner (#2067) ### What problem does this PR solve? - Sync log to follower and async learner when a new WAL entry is written. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- src/main/cluster_manager.cpp | 187 ++++++++++++++---- src/main/cluster_manager.cppm | 37 ++-- .../peer_server_thrift/peer_server_types.cpp | 156 ++++++++------- .../peer_server_thrift/peer_server_types.h | 18 +- src/network/peer_server_thrift_service.cpp | 7 +- src/network/peer_task.cpp | 4 +- src/network/peer_task.cppm | 8 +- src/network/peer_thrift_client.cpp | 82 +++++--- src/network/peer_thrift_client.cppm | 13 +- src/storage/wal/wal_manager.cpp | 42 +++- thrift/peer_server.thrift | 2 + 11 files changed, 383 insertions(+), 173 deletions(-) diff --git a/src/main/cluster_manager.cpp b/src/main/cluster_manager.cpp index fb5a8fc51a..719e6e887a 100644 --- a/src/main/cluster_manager.cpp +++ b/src/main/cluster_manager.cpp @@ -32,7 +32,7 @@ ClusterManager::~ClusterManager() { other_node_map_.clear(); this_node_.reset(); if (client_to_leader_.get() != nullptr) { - client_to_leader_->UnInit(); + client_to_leader_->UnInit(true); } client_to_leader_.reset(); } @@ -79,7 +79,7 @@ Status ClusterManager::InitAsFollower(const String &node_name, const String &lea leader_node_->port_ = leader_port; Status client_status = Status::OK(); - std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(leader_ip, leader_port); + std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(node_name, leader_ip, leader_port); if (!client_status.ok()) { return client_status; } @@ -110,7 +110,7 @@ Status ClusterManager::InitAsLearner(const String &node_name, const String &lead leader_node_->port_ = leader_port; Status client_status = Status::OK(); - std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(leader_ip, leader_port); + std::tie(client_to_leader_, client_status) = ClusterManager::ConnectToServerNoLock(node_name, leader_ip, leader_port); if (!client_status.ok()) { return client_status; } @@ -131,12 +131,12 @@ Status ClusterManager::UnInit() { } std::unique_lock lock(mutex_); - Status status = UnregisterFromLeaderNoLock(); + Status status = UnregisterToLeaderNoLock(); other_node_map_.clear(); this_node_.reset(); if (client_to_leader_.get() != nullptr) { - client_to_leader_->UnInit(); + client_to_leader_->UnInit(true); } client_to_leader_.reset(); @@ -216,6 +216,9 @@ void ClusterManager::HeartBeatToLeader() { // Update the non-leader node info from leader UpdateNodeInfoNoLock(hb_task->other_nodes_); + + // Check if leader can't send message to this reader node + this_node_->node_status_ = hb_task->sender_status_; } return; } @@ -291,7 +294,7 @@ Status ClusterManager::RegisterToLeaderNoLock() { return status; } -Status ClusterManager::UnregisterFromLeaderNoLock() { +Status ClusterManager::UnregisterToLeaderNoLock() { Status status = Status::OK(); if (this_node_->node_role_ == NodeRole::kFollower or this_node_->node_role_ == NodeRole::kLearner) { if (leader_node_->node_status_ == NodeStatus::kAlive) { @@ -309,8 +312,9 @@ Status ClusterManager::UnregisterFromLeaderNoLock() { return status; } -Tuple, Status> ClusterManager::ConnectToServerNoLock(const String &server_ip, i64 server_port) { - SharedPtr client = MakeShared(server_ip, server_port); +Tuple, Status> +ClusterManager::ConnectToServerNoLock(const String &sending_node_name, const String &server_ip, i64 server_port) { + SharedPtr client = MakeShared(sending_node_name, server_ip, server_port); Status client_status = client->Init(); if (!client_status.ok()) { return {nullptr, client_status}; @@ -318,15 +322,18 @@ Tuple, Status> ClusterManager::ConnectToServerNoLock(const return {client, client_status}; } -Status ClusterManager::SendLogs(const String &node_name, const SharedPtr &peer_client, const Vector> &logs, bool synchronize) { +Status +ClusterManager::SendLogs(const String &node_name, const SharedPtr &peer_client, const Vector> &logs, bool synchronize) { SharedPtr sync_log_task = MakeShared(node_name, logs); peer_client->Send(sync_log_task); - if(synchronize) { + Status status = Status::OK(); + if (synchronize) { sync_log_task->Wait(); + } else { + return status; } - Status status = Status::OK(); if (sync_log_task->error_code_ != 0) { LOG_ERROR(fmt::format("Fail to send log follower: {}, error message: {}", node_name, sync_log_task->error_message_)); status.code_ = static_cast(sync_log_task->error_code_); @@ -335,6 +342,28 @@ Status ClusterManager::SendLogs(const String &node_name, const SharedPtr> &followers, + Vector> &follower_clients, + Vector> &learners, + Vector> &learner_clients) { + if (this_node_->node_role_ != NodeRole::kLeader) { + return Status::InvalidNodeRole("Expect leader node"); + } + std::unique_lock lock(mutex_); + for (const auto &node_info_pair : other_node_map_) { + if (node_info_pair.second->node_role_ == NodeRole::kFollower && node_info_pair.second->node_status_ == NodeStatus::kAlive) { + followers.emplace_back(node_info_pair.second); + follower_clients.emplace_back(reader_client_map_[node_info_pair.first]); + } + if (node_info_pair.second->node_role_ == NodeRole::kLearner && node_info_pair.second->node_status_ == NodeStatus::kAlive) { + learners.emplace_back(node_info_pair.second); + learner_clients.emplace_back(reader_client_map_[node_info_pair.first]); + } + } + + return Status::OK(); +} + Status ClusterManager::AddNodeInfo(const SharedPtr &node_info) { // Only used by Leader on follower/learner registration. std::unique_lock lock(mutex_); @@ -354,7 +383,8 @@ Status ClusterManager::AddNodeInfo(const SharedPtr &node_info) { } // Connect to follower/learner server. - auto [client_to_follower, client_status] = ClusterManager::ConnectToServerNoLock(node_info->ip_address_, node_info->port_); + auto [client_to_follower, client_status] = + ClusterManager::ConnectToServerNoLock(this_node_->node_name_, node_info->ip_address_, node_info->port_); if (!client_status.ok()) { return client_status; } @@ -366,7 +396,7 @@ Status ClusterManager::AddNodeInfo(const SharedPtr &node_info) { return Status::OK(); } -Status ClusterManager::RemoveNode(const String &node_name) { +Status ClusterManager::UpdateNodeByLeader(const String &node_name, UpdateNodeOp update_node_op) { // Only used in leader mode. std::unique_lock lock(mutex_); @@ -374,15 +404,35 @@ Status ClusterManager::RemoveNode(const String &node_name) { if (iter == other_node_map_.end()) { return Status::NotExistNode(fmt::format("Attempt to remove non-exist node: {}", node_name)); } else { - other_node_map_.erase(node_name); + switch (update_node_op) { + case UpdateNodeOp::kRemove: { + other_node_map_.erase(node_name); + break; + } + case UpdateNodeOp::kLostConnection: { + // Can't connect to the node + iter->second->node_status_ = NodeStatus::kLostConnection; + break; + } + } } auto client_iter = reader_client_map_.find(node_name); - if(client_iter == reader_client_map_.end()) { + if (client_iter == reader_client_map_.end()) { return Status::NotExistNode(fmt::format("Attempt to disconnect from non-exist node: {}", node_name)); } else { - client_iter->second->UnInit(); - reader_client_map_.erase(node_name); + switch (update_node_op) { + case UpdateNodeOp::kRemove: { + client_iter->second->UnInit(true); + reader_client_map_.erase(node_name); + break; + } + case UpdateNodeOp::kLostConnection: { + client_iter->second->UnInit(false); + reader_client_map_.erase(node_name); + break; + } + } } return Status::OK(); @@ -390,7 +440,8 @@ Status ClusterManager::RemoveNode(const String &node_name) { Status ClusterManager::UpdateNodeInfoByHeartBeat(const SharedPtr &non_leader_node, Vector &other_nodes, - i64 &leader_term) { + i64 &leader_term, + infinity_peer_server::NodeStatus::type &sender_status) { // Used by leader std::unique_lock lock(mutex_); if (this_node_->node_role_ != NodeRole::kLeader) { @@ -411,12 +462,32 @@ Status ClusterManager::UpdateNodeInfoByHeartBeat(const SharedPtr &non_ other_node->port_); return Status::NodeInfoUpdated(ToString(other_node->node_role_)); } - // Found the node, just update the timestamp - other_node->txn_timestamp_ = non_leader_node->txn_timestamp_; - auto now = std::chrono::system_clock::now(); - auto time_since_epoch = now.time_since_epoch(); - other_node->last_update_ts_ = std::chrono::duration_cast(time_since_epoch).count(); - ++other_node->heartbeat_count_; + // Found the node + switch (other_node->node_status_) { + case NodeStatus::kAlive: + case NodeStatus::kTimeout: { + // just update the timestamp + other_node->txn_timestamp_ = non_leader_node->txn_timestamp_; + auto now = std::chrono::system_clock::now(); + auto time_since_epoch = now.time_since_epoch(); + other_node->last_update_ts_ = std::chrono::duration_cast(time_since_epoch).count(); + ++other_node->heartbeat_count_; + sender_status = infinity_peer_server::NodeStatus::type::kAlive; + break; + } + case NodeStatus::kLostConnection: { + LOG_ERROR(fmt::format("Node {} from {}:{} cant' connected, but still can receive heartbeat.", + other_node->node_name_, + other_node->ip_address_, + other_node->port_)); + sender_status = infinity_peer_server::NodeStatus::type::kLostConnection; + break; + } + case NodeStatus::kInvalid: { + UnrecoverableError("Invalid node status"); + } + } + non_leader_node_found = true; } else { infinity_peer_server::NodeInfo thrift_node_info; @@ -481,18 +552,62 @@ Status ClusterManager::SyncLogsOnRegistration(const SharedPtr &non_lea return SendLogs(non_leader_node->node_name_, peer_client, wal_strings, true); } -Status ClusterManager::SyncLogsToFollower() { - // Used by leader to sync logs when get HB from follower - return Status::OK(); -} +void ClusterManager::PrepareLogs(const SharedPtr &log_string) { logs_to_sync_.emplace_back(log_string); } + +Status ClusterManager::SyncLogs() { + LOG_TRACE("Sync logs to follower and async logs to learner"); + Set sent_nodes; + while (true) { + // Get follower and learner node + Vector> followers; + Vector> follower_clients; + Vector> learners; + Vector> learner_clients; + + Status status = GetReadersInfo(followers, follower_clients, learners, learner_clients); + if (!status.ok()) { + return status; + } + + SizeT follower_count = followers.size(); + SizeT learner_count = learners.size(); + + if (follower_count != follower_clients.size() && learner_count != learner_clients.size()) { + return Status::UnexpectedError("Node info and node client count isn't match"); + } -Status ClusterManager::AsyncLogsToLearner() { - // Used by leader to async logs when get HB from learner + // Replicate logs to follower + for (SizeT idx = 0; idx < follower_count; ++idx) { + const String &follower_name = followers[idx]->node_name_; + if (!sent_nodes.contains(follower_name)) { + status = SendLogs(follower_name, follower_clients[idx], logs_to_sync_, true); + if (status.ok()) { + sent_nodes.insert(follower_name); + } + } + } + + // Replicate logs to learner + for (SizeT idx = 0; idx < learner_count; ++idx) { + const String &learner_name = learners[idx]->node_name_; + if (!sent_nodes.contains(learner_name)) { + status = SendLogs(learner_name, learner_clients[idx], logs_to_sync_, false); + if (status.ok()) { + sent_nodes.insert(learner_name); + } + } + } + + if (sent_nodes.size() == follower_count + learner_count) { + logs_to_sync_.clear(); + break; + } + } return Status::OK(); } Status ClusterManager::SetFollowerNumber(SizeT new_follower_number) { - if(new_follower_number > 5) { + if (new_follower_number > 5) { return Status::NotSupport("Attempt to set follower count larger than 5."); } @@ -501,9 +616,7 @@ Status ClusterManager::SetFollowerNumber(SizeT new_follower_number) { return Status::OK(); } -SizeT ClusterManager::GetFollowerNumber() const { - return follower_count_; -} +SizeT ClusterManager::GetFollowerNumber() const { return follower_count_; } Status ClusterManager::UpdateNodeInfoNoLock(const Vector> &info_of_nodes) { // Only follower and learner will use this function. @@ -542,12 +655,14 @@ Status ClusterManager::UpdateNodeInfoNoLock(const Vector> &i return Status::OK(); } -Status ClusterManager::ApplySyncedLogNolock(const Vector& synced_logs) { - for(auto& log_str: synced_logs) { +Status ClusterManager::ApplySyncedLogNolock(const Vector &synced_logs) { + // WalManager *wal_manager = txn_manager_->wal_manager(); + for (auto &log_str : synced_logs) { const i32 entry_size = log_str.size(); const char *ptr = log_str.data(); SharedPtr entry = WalEntry::ReadAdv(ptr, entry_size); LOG_DEBUG(fmt::format("WAL Entry: {}", entry->ToString())); + // wal_manager->ReplayWalEntry(*entry); } return Status::OK(); } diff --git a/src/main/cluster_manager.cppm b/src/main/cluster_manager.cppm index 91101d37ae..80ee0c5974 100644 --- a/src/main/cluster_manager.cppm +++ b/src/main/cluster_manager.cppm @@ -26,6 +26,8 @@ import txn_manager; namespace infinity { +export enum class UpdateNodeOp { kRemove, kLostConnection }; + export class ClusterManager { public: explicit ClusterManager(TxnManager *txn_manager) : txn_manager_(txn_manager) {} @@ -45,29 +47,33 @@ public: private: void CheckHeartBeatInner(); Status RegisterToLeaderNoLock(); - Status UnregisterFromLeaderNoLock(); - Tuple, Status> ConnectToServerNoLock(const String &server_ip, i64 server_port); - Status SendLogs(const String &node_name, const SharedPtr& peer_client, const Vector>& logs, bool synchronize); + Status UnregisterToLeaderNoLock(); + Tuple, Status> ConnectToServerNoLock(const String &sending_node_name, const String &server_ip, i64 server_port); + Status SendLogs(const String &node_name, const SharedPtr &peer_client, const Vector> &logs, bool synchronize); + + Status GetReadersInfo(Vector> &followers, + Vector> &follower_clients, + Vector> &learners, + Vector> &learner_clients); public: // Used by leader to add non-leader node in register phase Status AddNodeInfo(const SharedPtr &new_node); // Used by leader to remove unregister node - Status RemoveNode(const String &node_name); + Status UpdateNodeByLeader(const String &node_name, UpdateNodeOp update_node_op); // Used by leader when get HB request - Status - UpdateNodeInfoByHeartBeat(const SharedPtr &non_leader_node, Vector &other_nodes, i64 &leader_term); + Status UpdateNodeInfoByHeartBeat(const SharedPtr &non_leader_node, + Vector &other_nodes, + i64 &leader_term, + infinity_peer_server::NodeStatus::type &sender_status); // Used by leader to notify leader to synchronize logs to the follower and learner, during registration - Status SyncLogsOnRegistration(const SharedPtr &non_leader_node, const SharedPtr& peer_client); + Status SyncLogsOnRegistration(const SharedPtr &non_leader_node, const SharedPtr &peer_client); - // Used by leader to notify to synchronize logs to follower, during txn bottom phase - Status SyncLogsToFollower(); - - // Used by leader to notify to asynchronize logs to learner, after txn; - Status AsyncLogsToLearner(); + void PrepareLogs(const SharedPtr &log_string); + Status SyncLogs(); // Used by leader to control the number of follower Status SetFollowerNumber(SizeT new_follower_number); @@ -76,7 +82,7 @@ public: // Use by follower / learner to update all node info when get HB response from leader Status UpdateNodeInfoNoLock(const Vector> &info_of_nodes); - Status ApplySyncedLogNolock(const Vector& synced_logs); + Status ApplySyncedLogNolock(const Vector &synced_logs); // Used by all nodes ADMIN SHOW NODES Vector> ListNodes() const; @@ -95,15 +101,14 @@ private: SharedPtr this_node_; // Used by leader and follower/learner Map> other_node_map_; // Used by leader and follower/learner - // Vector> other_nodes_; // Used by leader and follower/learner + // Vector> other_nodes_; // Used by leader and follower/learner // Leader clients to followers and learners Map> reader_client_map_{}; // Used by leader; + Vector> logs_to_sync_{}; SharedPtr client_to_leader_{}; // Used by follower and learner to connect leader server; - Map> clients_to_follower_{}; // Used by leader to connect follower / learner server; - SharedPtr hb_periodic_thread_{}; std::mutex hb_mutex_; std::condition_variable hb_cv_; diff --git a/src/network/peer_server_thrift/peer_server_types.cpp b/src/network/peer_server_thrift/peer_server_types.cpp index 62b6cf8152..2ec54cc7bf 100644 --- a/src/network/peer_server_thrift/peer_server_types.cpp +++ b/src/network/peer_server_thrift/peer_server_types.cpp @@ -49,14 +49,16 @@ std::string to_string(const NodeType::type& val) { int _kNodeStatusValues[] = { NodeStatus::kInvalid, NodeStatus::kAlive, + NodeStatus::kLostConnection, NodeStatus::kTimeout }; const char* _kNodeStatusNames[] = { "kInvalid", "kAlive", + "kLostConnection", "kTimeout" }; -const std::map _NodeStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kNodeStatusValues, _kNodeStatusNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map _NodeStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kNodeStatusValues, _kNodeStatusNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const NodeStatus::type& val) { std::map::const_iterator it = _NodeStatus_VALUES_TO_NAMES.find(val); @@ -1037,6 +1039,10 @@ void HeartBeatResponse::__set_leader_term(const int64_t val) { void HeartBeatResponse::__set_other_nodes(const std::vector & val) { this->other_nodes = val; } + +void HeartBeatResponse::__set_sender_status(const NodeStatus::type val) { + this->sender_status = val; +} std::ostream& operator<<(std::ostream& out, const HeartBeatResponse& obj) { obj.printTo(out); @@ -1109,6 +1115,16 @@ uint32_t HeartBeatResponse::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast21; + xfer += iprot->readI32(ecast21); + this->sender_status = static_cast(ecast21); + this->__isset.sender_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -1141,15 +1157,19 @@ uint32_t HeartBeatResponse::write(::apache::thrift::protocol::TProtocol* oprot) xfer += oprot->writeFieldBegin("other_nodes", ::apache::thrift::protocol::T_LIST, 4); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->other_nodes.size())); - std::vector ::const_iterator _iter21; - for (_iter21 = this->other_nodes.begin(); _iter21 != this->other_nodes.end(); ++_iter21) + std::vector ::const_iterator _iter22; + for (_iter22 = this->other_nodes.begin(); _iter22 != this->other_nodes.end(); ++_iter22) { - xfer += (*_iter21).write(oprot); + xfer += (*_iter22).write(oprot); } xfer += oprot->writeListEnd(); } xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("sender_status", ::apache::thrift::protocol::T_I32, 5); + xfer += oprot->writeI32(static_cast(this->sender_status)); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -1161,22 +1181,25 @@ void swap(HeartBeatResponse &a, HeartBeatResponse &b) { swap(a.error_message, b.error_message); swap(a.leader_term, b.leader_term); swap(a.other_nodes, b.other_nodes); + swap(a.sender_status, b.sender_status); swap(a.__isset, b.__isset); } -HeartBeatResponse::HeartBeatResponse(const HeartBeatResponse& other22) { - error_code = other22.error_code; - error_message = other22.error_message; - leader_term = other22.leader_term; - other_nodes = other22.other_nodes; - __isset = other22.__isset; -} -HeartBeatResponse& HeartBeatResponse::operator=(const HeartBeatResponse& other23) { +HeartBeatResponse::HeartBeatResponse(const HeartBeatResponse& other23) { error_code = other23.error_code; error_message = other23.error_message; leader_term = other23.leader_term; other_nodes = other23.other_nodes; + sender_status = other23.sender_status; __isset = other23.__isset; +} +HeartBeatResponse& HeartBeatResponse::operator=(const HeartBeatResponse& other24) { + error_code = other24.error_code; + error_message = other24.error_message; + leader_term = other24.leader_term; + other_nodes = other24.other_nodes; + sender_status = other24.sender_status; + __isset = other24.__isset; return *this; } void HeartBeatResponse::printTo(std::ostream& out) const { @@ -1186,6 +1209,7 @@ void HeartBeatResponse::printTo(std::ostream& out) const { out << ", " << "error_message=" << to_string(error_message); out << ", " << "leader_term=" << to_string(leader_term); out << ", " << "other_nodes=" << to_string(other_nodes); + out << ", " << "sender_status=" << to_string(sender_status); out << ")"; } @@ -1241,14 +1265,14 @@ uint32_t SyncLogRequest::read(::apache::thrift::protocol::TProtocol* iprot) { if (ftype == ::apache::thrift::protocol::T_LIST) { { this->log_entries.clear(); - uint32_t _size24; - ::apache::thrift::protocol::TType _etype27; - xfer += iprot->readListBegin(_etype27, _size24); - this->log_entries.resize(_size24); - uint32_t _i28; - for (_i28 = 0; _i28 < _size24; ++_i28) + uint32_t _size25; + ::apache::thrift::protocol::TType _etype28; + xfer += iprot->readListBegin(_etype28, _size25); + this->log_entries.resize(_size25); + uint32_t _i29; + for (_i29 = 0; _i29 < _size25; ++_i29) { - xfer += iprot->readBinary(this->log_entries[_i28]); + xfer += iprot->readBinary(this->log_entries[_i29]); } xfer += iprot->readListEnd(); } @@ -1281,10 +1305,10 @@ uint32_t SyncLogRequest::write(::apache::thrift::protocol::TProtocol* oprot) con xfer += oprot->writeFieldBegin("log_entries", ::apache::thrift::protocol::T_LIST, 2); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast(this->log_entries.size())); - std::vector ::const_iterator _iter29; - for (_iter29 = this->log_entries.begin(); _iter29 != this->log_entries.end(); ++_iter29) + std::vector ::const_iterator _iter30; + for (_iter30 = this->log_entries.begin(); _iter30 != this->log_entries.end(); ++_iter30) { - xfer += oprot->writeBinary((*_iter29)); + xfer += oprot->writeBinary((*_iter30)); } xfer += oprot->writeListEnd(); } @@ -1302,15 +1326,15 @@ void swap(SyncLogRequest &a, SyncLogRequest &b) { swap(a.__isset, b.__isset); } -SyncLogRequest::SyncLogRequest(const SyncLogRequest& other30) { - node_name = other30.node_name; - log_entries = other30.log_entries; - __isset = other30.__isset; -} -SyncLogRequest& SyncLogRequest::operator=(const SyncLogRequest& other31) { +SyncLogRequest::SyncLogRequest(const SyncLogRequest& other31) { node_name = other31.node_name; log_entries = other31.log_entries; __isset = other31.__isset; +} +SyncLogRequest& SyncLogRequest::operator=(const SyncLogRequest& other32) { + node_name = other32.node_name; + log_entries = other32.log_entries; + __isset = other32.__isset; return *this; } void SyncLogRequest::printTo(std::ostream& out) const { @@ -1431,17 +1455,17 @@ void swap(SyncLogResponse &a, SyncLogResponse &b) { swap(a.__isset, b.__isset); } -SyncLogResponse::SyncLogResponse(const SyncLogResponse& other32) { - error_code = other32.error_code; - error_message = other32.error_message; - txn_timestamp = other32.txn_timestamp; - __isset = other32.__isset; -} -SyncLogResponse& SyncLogResponse::operator=(const SyncLogResponse& other33) { +SyncLogResponse::SyncLogResponse(const SyncLogResponse& other33) { error_code = other33.error_code; error_message = other33.error_message; txn_timestamp = other33.txn_timestamp; __isset = other33.__isset; +} +SyncLogResponse& SyncLogResponse::operator=(const SyncLogResponse& other34) { + error_code = other34.error_code; + error_message = other34.error_message; + txn_timestamp = other34.txn_timestamp; + __isset = other34.__isset; return *this; } void SyncLogResponse::printTo(std::ostream& out) const { @@ -1503,9 +1527,9 @@ uint32_t ChangeRoleRequest::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast34; - xfer += iprot->readI32(ecast34); - this->node_type = static_cast(ecast34); + int32_t ecast35; + xfer += iprot->readI32(ecast35); + this->node_type = static_cast(ecast35); this->__isset.node_type = true; } else { xfer += iprot->skip(ftype); @@ -1548,15 +1572,15 @@ void swap(ChangeRoleRequest &a, ChangeRoleRequest &b) { swap(a.__isset, b.__isset); } -ChangeRoleRequest::ChangeRoleRequest(const ChangeRoleRequest& other35) { - node_name = other35.node_name; - node_type = other35.node_type; - __isset = other35.__isset; -} -ChangeRoleRequest& ChangeRoleRequest::operator=(const ChangeRoleRequest& other36) { +ChangeRoleRequest::ChangeRoleRequest(const ChangeRoleRequest& other36) { node_name = other36.node_name; node_type = other36.node_type; __isset = other36.__isset; +} +ChangeRoleRequest& ChangeRoleRequest::operator=(const ChangeRoleRequest& other37) { + node_name = other37.node_name; + node_type = other37.node_type; + __isset = other37.__isset; return *this; } void ChangeRoleRequest::printTo(std::ostream& out) const { @@ -1643,13 +1667,13 @@ void swap(ChangeRoleResponse &a, ChangeRoleResponse &b) { swap(a.__isset, b.__isset); } -ChangeRoleResponse::ChangeRoleResponse(const ChangeRoleResponse& other37) { - node_name = other37.node_name; - __isset = other37.__isset; -} -ChangeRoleResponse& ChangeRoleResponse::operator=(const ChangeRoleResponse& other38) { +ChangeRoleResponse::ChangeRoleResponse(const ChangeRoleResponse& other38) { node_name = other38.node_name; __isset = other38.__isset; +} +ChangeRoleResponse& ChangeRoleResponse::operator=(const ChangeRoleResponse& other39) { + node_name = other39.node_name; + __isset = other39.__isset; return *this; } void ChangeRoleResponse::printTo(std::ostream& out) const { @@ -1737,9 +1761,9 @@ uint32_t NewLeaderRequest::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 4: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast39; - xfer += iprot->readI32(ecast39); - this->new_node_type = static_cast(ecast39); + int32_t ecast40; + xfer += iprot->readI32(ecast40); + this->new_node_type = static_cast(ecast40); this->__isset.new_node_type = true; } else { xfer += iprot->skip(ftype); @@ -1805,21 +1829,21 @@ void swap(NewLeaderRequest &a, NewLeaderRequest &b) { swap(a.__isset, b.__isset); } -NewLeaderRequest::NewLeaderRequest(const NewLeaderRequest& other40) { - node_name = other40.node_name; - node_ip = other40.node_ip; - node_port = other40.node_port; - new_node_type = other40.new_node_type; - new_leader_term = other40.new_leader_term; - __isset = other40.__isset; -} -NewLeaderRequest& NewLeaderRequest::operator=(const NewLeaderRequest& other41) { +NewLeaderRequest::NewLeaderRequest(const NewLeaderRequest& other41) { node_name = other41.node_name; node_ip = other41.node_ip; node_port = other41.node_port; new_node_type = other41.new_node_type; new_leader_term = other41.new_leader_term; __isset = other41.__isset; +} +NewLeaderRequest& NewLeaderRequest::operator=(const NewLeaderRequest& other42) { + node_name = other42.node_name; + node_ip = other42.node_ip; + node_port = other42.node_port; + new_node_type = other42.new_node_type; + new_leader_term = other42.new_leader_term; + __isset = other42.__isset; return *this; } void NewLeaderRequest::printTo(std::ostream& out) const { @@ -1909,13 +1933,13 @@ void swap(NewLeaderResponse &a, NewLeaderResponse &b) { swap(a.__isset, b.__isset); } -NewLeaderResponse::NewLeaderResponse(const NewLeaderResponse& other42) { - node_name = other42.node_name; - __isset = other42.__isset; -} -NewLeaderResponse& NewLeaderResponse::operator=(const NewLeaderResponse& other43) { +NewLeaderResponse::NewLeaderResponse(const NewLeaderResponse& other43) { node_name = other43.node_name; __isset = other43.__isset; +} +NewLeaderResponse& NewLeaderResponse::operator=(const NewLeaderResponse& other44) { + node_name = other44.node_name; + __isset = other44.__isset; return *this; } void NewLeaderResponse::printTo(std::ostream& out) const { diff --git a/src/network/peer_server_thrift/peer_server_types.h b/src/network/peer_server_thrift/peer_server_types.h index bb888f4e9d..3a15186999 100644 --- a/src/network/peer_server_thrift/peer_server_types.h +++ b/src/network/peer_server_thrift/peer_server_types.h @@ -40,7 +40,8 @@ struct NodeStatus { enum type { kInvalid = 0, kAlive = 1, - kTimeout = 2 + kLostConnection = 2, + kTimeout = 3 }; }; @@ -484,11 +485,12 @@ void swap(HeartBeatRequest &a, HeartBeatRequest &b); std::ostream& operator<<(std::ostream& out, const HeartBeatRequest& obj); typedef struct _HeartBeatResponse__isset { - _HeartBeatResponse__isset() : error_code(false), error_message(false), leader_term(false), other_nodes(false) {} + _HeartBeatResponse__isset() : error_code(false), error_message(false), leader_term(false), other_nodes(false), sender_status(false) {} bool error_code :1; bool error_message :1; bool leader_term :1; bool other_nodes :1; + bool sender_status :1; } _HeartBeatResponse__isset; class HeartBeatResponse : public virtual ::apache::thrift::TBase { @@ -499,7 +501,8 @@ class HeartBeatResponse : public virtual ::apache::thrift::TBase { HeartBeatResponse() noexcept : error_code(0), error_message(), - leader_term(0) { + leader_term(0), + sender_status(static_cast(0)) { } virtual ~HeartBeatResponse() noexcept; @@ -507,6 +510,11 @@ class HeartBeatResponse : public virtual ::apache::thrift::TBase { std::string error_message; int64_t leader_term; std::vector other_nodes; + /** + * + * @see NodeStatus + */ + NodeStatus::type sender_status; _HeartBeatResponse__isset __isset; @@ -518,6 +526,8 @@ class HeartBeatResponse : public virtual ::apache::thrift::TBase { void __set_other_nodes(const std::vector & val); + void __set_sender_status(const NodeStatus::type val); + bool operator == (const HeartBeatResponse & rhs) const { if (!(error_code == rhs.error_code)) @@ -528,6 +538,8 @@ class HeartBeatResponse : public virtual ::apache::thrift::TBase { return false; if (!(other_nodes == rhs.other_nodes)) return false; + if (!(sender_status == rhs.sender_status)) + return false; return true; } bool operator != (const HeartBeatResponse &rhs) const { diff --git a/src/network/peer_server_thrift_service.cpp b/src/network/peer_server_thrift_service.cpp index 746558657e..02f0cc2066 100644 --- a/src/network/peer_server_thrift_service.cpp +++ b/src/network/peer_server_thrift_service.cpp @@ -80,7 +80,7 @@ void PeerServerThriftService::Unregister(infinity_peer_server::UnregisterRespons LOG_TRACE("Get Unregister request"); NodeInfo *leader_node = InfinityContext::instance().cluster_manager()->ThisNode().get(); if (leader_node->node_role_ == NodeRole::kLeader) { - Status status = InfinityContext::instance().cluster_manager()->RemoveNode(request.node_name); + Status status = InfinityContext::instance().cluster_manager()->UpdateNodeByLeader(request.node_name, UpdateNodeOp::kRemove); if (!status.ok()) { response.error_code = static_cast(status.code_); response.error_message = status.message(); @@ -128,14 +128,15 @@ void PeerServerThriftService::HeartBeat(infinity_peer_server::HeartBeatResponse Status status = InfinityContext::instance().cluster_manager()->UpdateNodeInfoByHeartBeat(non_leader_node_info, response.other_nodes, - response.leader_term); + response.leader_term, + response.sender_status); if (!status.ok()) { response.error_code = static_cast(status.code()); response.error_message = status.message(); } } else { response.error_code = static_cast(ErrorCode::kInvalidNodeRole); - response.error_message = fmt::format("Attempt to heatbeat from a non-leader node: {}", ToString(leader_node->node_role_)); + response.error_message = fmt::format("Attempt to heartbeat from a non-leader node: {}", ToString(leader_node->node_role_)); } return; } diff --git a/src/network/peer_task.cpp b/src/network/peer_task.cpp index 782d2436ae..cde0a3651d 100644 --- a/src/network/peer_task.cpp +++ b/src/network/peer_task.cpp @@ -45,7 +45,9 @@ String ToString(NodeStatus status) { case NodeStatus::kTimeout: return "timeout"; case NodeStatus::kInvalid: - return "kInvalid"; + return "invalid"; + case NodeStatus::kLostConnection: + return "lost connection"; } } diff --git a/src/network/peer_task.cppm b/src/network/peer_task.cppm index 4edb07e0f2..5dc510204e 100644 --- a/src/network/peer_task.cppm +++ b/src/network/peer_task.cppm @@ -42,7 +42,7 @@ export enum class PeerTaskType { kNewLeader, }; -export enum class NodeStatus { kAlive, kTimeout, kInvalid }; +export enum class NodeStatus { kAlive, kTimeout, kLostConnection, kInvalid }; export String ToString(NodeStatus); @@ -98,7 +98,7 @@ protected: export class TerminatePeerTask final : public PeerTask { public: - TerminatePeerTask() : PeerTask(PeerTaskType::kTerminate) {} + TerminatePeerTask(bool force_async) : PeerTask(PeerTaskType::kTerminate, force_async) {} String ToString() const final; }; @@ -157,11 +157,13 @@ public: String error_message_{}; i64 leader_term_{}; Vector> other_nodes_{}; + NodeStatus sender_status_{NodeStatus::kInvalid}; }; export class SyncLogTask final : public PeerTask { public: - SyncLogTask(const String& node_name, const Vector>& log_strings) : PeerTask(PeerTaskType::kLogSync), node_name_(node_name), log_strings_(log_strings) {} + SyncLogTask(const String &node_name, const Vector> &log_strings) + : PeerTask(PeerTaskType::kLogSync), node_name_(node_name), log_strings_(log_strings) {} String ToString() const final; diff --git a/src/network/peer_thrift_client.cpp b/src/network/peer_thrift_client.cpp index fc28b82831..4dad69be86 100644 --- a/src/network/peer_thrift_client.cpp +++ b/src/network/peer_thrift_client.cpp @@ -24,12 +24,14 @@ import status; import thrift; import infinity_exception; import peer_task; +import infinity_context; +import cluster_manager; namespace infinity { PeerClient::~PeerClient() { if (running_) { - UnInit(); + UnInit(false); } server_connected_ = false; } @@ -45,16 +47,21 @@ Status PeerClient::Init() { } /// TODO: comment -Status PeerClient::UnInit() { - LOG_INFO(fmt::format("Peer client: {} is stopping.", this_node_name_)); +Status PeerClient::UnInit(bool sync) { + LOG_INFO(fmt::format("Peer client: {} is stopping.", sending_node_name_)); + if (processor_thread_.get() != nullptr) { - SharedPtr terminate_task = MakeShared(); + SharedPtr terminate_task = MakeShared(!sync); peer_task_queue_.Enqueue(terminate_task); terminate_task->Wait(); - processor_thread_->join(); + if (sync) { + processor_thread_->join(); + } else { + processor_thread_->detach(); + } } - LOG_INFO(fmt::format("Peer client: {} is stopped.", this_node_name_)); + LOG_INFO(fmt::format("Peer client: {} is stopped.", sending_node_name_)); return Disconnect(); } @@ -65,7 +72,7 @@ Status PeerClient::Reconnect() { } try { - socket_ = MakeShared(node_info_.ip_address_, node_info_.port_); + socket_ = MakeShared(ip_address_, port_); TSocket *socket = static_cast(socket_.get()); socket->setConnTimeout(2000); // 2s to timeout @@ -75,14 +82,14 @@ Status PeerClient::Reconnect() { transport_->open(); server_connected_ = true; } catch (const std::exception &e) { - status = Status::CantConnectServer(node_info_.ip_address_, node_info_.port_, e.what()); + status = Status::CantConnectServer(ip_address_, port_, e.what()); } return status; } Status PeerClient::Disconnect() { Status status = Status::OK(); - if(server_connected_ == false) { + if (server_connected_ == false) { return status; } @@ -94,7 +101,7 @@ Status PeerClient::Disconnect() { transport_->close(); server_connected_ = false; } catch (const std::exception &e) { - status = Status::CantConnectServer(node_info_.ip_address_, node_info_.port_, e.what()); + status = Status::CantConnectServer(ip_address_, port_, e.what()); } return status; } @@ -136,7 +143,7 @@ void PeerClient::Process() { } case PeerTaskType::kLogSync: { LOG_TRACE(peer_task->ToString()); - SyncLogTask* sync_log_task = static_cast(peer_task.get()); + SyncLogTask *sync_log_task = static_cast(peer_task.get()); SyncLogs(sync_log_task); break; } @@ -194,7 +201,6 @@ void PeerClient::Register(RegisterPeerTask *peer_task) { } } - if (response.error_code != 0) { // Error peer_task->error_code_ = response.error_code; @@ -279,13 +285,32 @@ void PeerClient::HeartBeat(HeartBeatPeerTask *peer_task) { peer_task->error_code_ = response.error_code; peer_task->error_message_ = response.error_message; } else { + switch (response.sender_status) { + case infinity_peer_server::NodeStatus::type::kAlive: { + peer_task->sender_status_ = NodeStatus::kAlive; + break; + } + case infinity_peer_server::NodeStatus::type::kTimeout: { + peer_task->sender_status_ = NodeStatus::kTimeout; + break; + } + case infinity_peer_server::NodeStatus::type::kLostConnection: { + peer_task->sender_status_ = NodeStatus::kLostConnection; + break; + } + default: { + String error_message = "Invalid sender status"; + UnrecoverableError(error_message); + } + } + peer_task->leader_term_ = response.leader_term; SizeT node_count = response.other_nodes.size(); peer_task->other_nodes_.reserve(node_count); for (SizeT idx = 0; idx < node_count; ++idx) { SharedPtr node_info = MakeShared(); auto &other_node = response.other_nodes[idx]; - if (this_node_name_ == other_node.node_name) { + if (sending_node_name_ == other_node.node_name) { continue; } node_info->node_name_ = other_node.node_name; @@ -336,32 +361,27 @@ void PeerClient::SyncLogs(SyncLogTask *peer_task) { request.node_name = peer_task->node_name_; SizeT log_count = peer_task->log_strings_.size(); request.log_entries.reserve(log_count); - for(SizeT i = 0; i < log_count; ++ i) { + for (SizeT i = 0; i < log_count; ++i) { request.log_entries.emplace_back(*peer_task->log_strings_[i]); } try { client_->SyncLog(response, request); + if (response.error_code != 0) { + // Error + peer_task->error_code_ = response.error_code; + peer_task->error_message_ = response.error_message; + LOG_ERROR(fmt::format("Sync log to node: {}, error: {}", peer_task->node_name_, peer_task->error_message_)); + } } catch (apache::thrift::transport::TTransportException &thrift_exception) { - server_connected_ = false; - switch (thrift_exception.getType()) { - case TTransportExceptionType::END_OF_FILE: { - peer_task->error_message_ = thrift_exception.what(); - peer_task->error_code_ = static_cast(ErrorCode::kCantConnectServer); - return; - } - default: { - String error_message = "Synlog: error in data transfer to follower or learner"; - UnrecoverableError(error_message); - } + peer_task->error_message_ = thrift_exception.what(); + peer_task->error_code_ = static_cast(ErrorCode::kCantConnectServer); + LOG_ERROR(fmt::format("Sync log to node: {}, error: {}", peer_task->node_name_, peer_task->error_message_)); + Status status = InfinityContext::instance().cluster_manager()->UpdateNodeByLeader(peer_task->node_name_, UpdateNodeOp::kLostConnection); + if (!status.ok()) { + LOG_ERROR(status.message()); } } - - if (response.error_code != 0) { - // Error - peer_task->error_code_ = response.error_code; - peer_task->error_message_ = response.error_message; - } } } // namespace infinity diff --git a/src/network/peer_thrift_client.cppm b/src/network/peer_thrift_client.cppm index a0cea7b169..23f04e0313 100644 --- a/src/network/peer_thrift_client.cppm +++ b/src/network/peer_thrift_client.cppm @@ -32,10 +32,8 @@ using namespace infinity_peer_server; export class PeerClient { public: - PeerClient(const String &ip_addr, i64 port, const String &node_name = {}) : this_node_name_(node_name) { - node_info_.ip_address_ = ip_addr; - node_info_.port_ = port; - } + PeerClient(const String &sending_node_name, const String &ip_addr, i64 port) + : sending_node_name_(sending_node_name), ip_address_(ip_addr), port_(port) {} ~PeerClient(); // void SetPeerNode(NodeRole role, const String& node_name, i64 update_ts) { @@ -47,7 +45,7 @@ public: // Status Init(); - Status UnInit(); + Status UnInit(bool sync); Status Reconnect(); Status Disconnect(); @@ -63,8 +61,9 @@ private: void SyncLogs(SyncLogTask *peer_task); private: - String this_node_name_; - NodeInfo node_info_; + String sending_node_name_{}; + String ip_address_{}; + i64 port_{}; // For message transportation SharedPtr socket_{}; diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 9274b4de6e..57f6a50a60 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -55,6 +55,8 @@ import default_values; import defer_op; import index_base; import base_table_ref; +import cluster_manager; +import peer_task; module wal_manager; @@ -174,7 +176,7 @@ Vector> WalManager::GetDiffWalEntryString(TxnTimeStamp start_t Vector> log_entries; - TxnTimeStamp max_commit_ts = 0; // the max commit ts that has be checkpointed + TxnTimeStamp max_commit_ts = 0; // the max commit ts that has been checkpointed String catalog_dir = ""; { @@ -194,9 +196,12 @@ Vector> WalManager::GetDiffWalEntryString(TxnTimeStamp start_t max_commit_ts = checkpoint_cmd->max_commit_ts_; std::string catalog_path = fmt::format("{}/{}", data_path_, "catalog"); catalog_dir = Path(fmt::format("{}/{}", catalog_path, checkpoint_cmd->catalog_name_)).parent_path().string(); - break; + log_entries.push_back(wal_entry); + if(checkpoint_cmd->is_full_checkpoint_) { + // Full checkpoint, OK + break; + } } - log_entries.push_back(wal_entry); } LOG_INFO(fmt::format("Find checkpoint max commit ts: {}", max_commit_ts)); @@ -214,6 +219,11 @@ Vector> WalManager::GetDiffWalEntryString(TxnTimeStamp start_t LOG_TRACE(wal_entry->ToString()); if (wal_entry->commit_ts_ >= max_commit_ts and wal_entry->commit_ts_ > start_timestamp) { + WalCmdCheckpoint *checkpoint_cmd = nullptr; + if (wal_entry->IsCheckPoint(checkpoint_cmd)) { + // Ignore other CKPs + continue; + } log_entries.push_back(wal_entry); } else { break; @@ -256,6 +266,7 @@ void WalManager::Flush() { Deque log_batch{}; TxnManager *txn_mgr = storage_->txn_manager(); + ClusterManager *cluster_manager = nullptr; while (running_.load()) { wait_flush_.DequeueBulk(log_batch); if (log_batch.empty()) { @@ -284,10 +295,10 @@ void WalManager::Flush() { } i32 exp_size = entry->GetSizeInBytes(); - Vector buf(exp_size); - char *ptr = buf.data(); + SharedPtr buf = MakeShared(exp_size, 0); + char *ptr = buf->data(); entry->WriteAdv(ptr); - i32 act_size = ptr - buf.data(); + i32 act_size = ptr - buf->data(); if (exp_size != act_size) { String error_message = fmt::format("WalManager::Flush WalEntry estimated size {} differ with the actual one {}, entry {}", exp_size, @@ -295,7 +306,15 @@ void WalManager::Flush() { entry->ToString()); UnrecoverableError(error_message); } - ofs_.write(buf.data(), ptr - buf.data()); + ofs_.write(buf->data(), ptr - buf->data()); + + if (InfinityContext::instance().GetServerRole() == NodeRole::kLeader) { + if(cluster_manager == nullptr) { + cluster_manager = InfinityContext::instance().cluster_manager(); + } + cluster_manager->PrepareLogs(buf); + } + LOG_TRACE(fmt::format("WalManager::Flush done writing wal for txn_id {}, commit_ts {}", entry->txn_id_, entry->commit_ts_)); UpdateCommitState(entry->commit_ts_, wal_size_ + act_size); @@ -320,6 +339,10 @@ void WalManager::Flush() { } } + if (InfinityContext::instance().GetServerRole() == NodeRole::kLeader) { + cluster_manager->SyncLogs(); + } + for (const auto &entry : log_batch) { Txn *txn = txn_mgr->GetTxn(entry->txn_id_); if (txn != nullptr) { @@ -901,6 +924,11 @@ void WalManager::ReplayWalEntry(const WalEntry &entry) { // entry.commit_ts_); // break; case WalCommandType::CHECKPOINT: { + if(storage_->GetStorageMode() == StorageMode::kReadable) { + LOG_DEBUG("Load the checkpoint"); + } else { + UnrecoverableError("Checkpoint"); + } break; } case WalCommandType::COMPACT: { diff --git a/thrift/peer_server.thrift b/thrift/peer_server.thrift index cff9219d7d..1fa32c2c16 100644 --- a/thrift/peer_server.thrift +++ b/thrift/peer_server.thrift @@ -12,6 +12,7 @@ kInvalid, enum NodeStatus { kInvalid, kAlive, +kLostConnection, kTimeout } @@ -63,6 +64,7 @@ struct HeartBeatResponse { 2: string error_message, 3: i64 leader_term, 4: list other_nodes, +5: NodeStatus sender_status } struct SyncLogRequest {