From 6e0a461df0e1e9c4c25f767fc0f18fe9217c31fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=91=E9=A9=B0?= Date: Mon, 23 Dec 2024 13:48:00 +0800 Subject: [PATCH] [16/N][VirtualCluster] Add virtual cluster failover logic --- src/ray/common/virtual_cluster_id.h | 2 +- src/ray/gcs/gcs_server/gcs_init_data.cc | 18 +- src/ray/gcs/gcs_server/gcs_init_data.h | 15 + src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 162 ++++++++-- src/ray/gcs/gcs_server/gcs_virtual_cluster.h | 74 ++++- .../gcs_server/gcs_virtual_cluster_manager.cc | 5 +- .../test/gcs_virtual_cluster_manager_test.cc | 301 ++++++++++++++++-- src/ray/protobuf/gcs.proto | 8 +- 8 files changed, 519 insertions(+), 66 deletions(-) diff --git a/src/ray/common/virtual_cluster_id.h b/src/ray/common/virtual_cluster_id.h index 04ac15e33482c..16f5ca315351d 100644 --- a/src/ray/common/virtual_cluster_id.h +++ b/src/ray/common/virtual_cluster_id.h @@ -33,7 +33,7 @@ class VirtualClusterID : public SimpleID { return id_.find(kJobClusterIDSeperator) != std::string::npos; } - VirtualClusterID OwnerID() const { + VirtualClusterID ParentID() const { auto pos = id_.find(kJobClusterIDSeperator); return pos == std::string::npos ? Nil() : VirtualClusterID::FromBinary(id_.substr(0, pos)); diff --git a/src/ray/gcs/gcs_server/gcs_init_data.cc b/src/ray/gcs/gcs_server/gcs_init_data.cc index 76bac0670648e..e440ac1487d25 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.cc +++ b/src/ray/gcs/gcs_server/gcs_init_data.cc @@ -18,7 +18,7 @@ namespace ray { namespace gcs { void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { // There are 5 kinds of table data need to be loaded. - auto count_down = std::make_shared(5); + auto count_down = std::make_shared(6); auto on_load_finished = [count_down, on_done] { if (--(*count_down) == 0) { if (on_done) { @@ -36,6 +36,8 @@ void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { AsyncLoadActorTaskSpecTableData(on_load_finished); AsyncLoadPlacementGroupTableData(on_load_finished); + + AsyncLoadVirtualClusterTableData(on_load_finished); } void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) { @@ -102,5 +104,19 @@ void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done) load_actor_task_spec_table_data_callback)); } +void GcsInitData::AsyncLoadVirtualClusterTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading virtual cluster table data."; + auto load_virtual_cluster_table_data_callback = + [this, + on_done](absl::flat_hash_map &&result) { + virtual_cluster_table_data_ = std::move(result); + RAY_LOG(INFO) << "Finished loading virtual cluster table data, size = " + << virtual_cluster_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_.VirtualClusterTable().GetAll( + load_virtual_cluster_table_data_callback)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_init_data.h b/src/ray/gcs/gcs_server/gcs_init_data.h index 96890aeee5ae6..d8c3b3edf966c 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.h +++ b/src/ray/gcs/gcs_server/gcs_init_data.h @@ -64,6 +64,12 @@ class GcsInitData { return placement_group_table_data_; } + /// Get virtual cluster metadata. + const absl::flat_hash_map + &VirtualClusters() const { + return virtual_cluster_table_data_; + } + private: /// Load job metadata from the store into memory asynchronously. /// @@ -87,6 +93,11 @@ class GcsInitData { void AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done); + /// Load virtual cluster metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when virtual cluster metadata is loaded successfully. + void AsyncLoadVirtualClusterTableData(const EmptyCallback &on_done); + protected: /// The gcs table storage. gcs::GcsTableStorage &gcs_table_storage_; @@ -105,6 +116,10 @@ class GcsInitData { absl::flat_hash_map actor_table_data_; absl::flat_hash_map actor_task_spec_table_data_; + + /// Virtual cluster metadata. + absl::flat_hash_map + virtual_cluster_table_data_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 472f252bca91c..9f6664a34099e 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -78,8 +78,12 @@ void VirtualCluster::RemoveNodeInstances(ReplicaInstances replica_instances) { RAY_CHECK(job_cluster_iter != template_iter->second.end()); for (auto &[id, _] : node_instances) { - RAY_CHECK(job_cluster_iter->second.erase(id) == 1); - RAY_CHECK(--(replica_set_iter->second) >= 0); + if (job_cluster_iter->second.erase(id) == 0) { + RAY_LOG(WARNING) << "The node instance " << id << " is not found in cluster " + << GetID(); + } else { + RAY_CHECK(--(replica_set_iter->second) >= 0); + } } if (job_cluster_iter->second.empty()) { @@ -178,7 +182,6 @@ std::shared_ptr VirtualCluster::ToProto() const { data->set_id(GetID()); data->set_mode(GetMode()); data->set_revision(GetRevision()); - data->mutable_replica_sets()->insert(replica_sets_.begin(), replica_sets_.end()); for (auto &[template_id, job_node_instances] : visible_node_instances_) { for (auto &[job_cluster_id, node_instances] : job_node_instances) { for (auto &[id, node_instance] : node_instances) { @@ -205,19 +208,26 @@ std::string VirtualCluster::DebugString() const { } ///////////////////////// ExclusiveCluster ///////////////////////// -Status ExclusiveCluster::CreateJobCluster(const std::string &job_name, +void ExclusiveCluster::LoadJobCluster(const rpc::VirtualClusterTableData &data) { + RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE); + + const auto &job_cluster_id = data.id(); + RAY_CHECK(VirtualClusterID::FromBinary(job_cluster_id).IsJobClusterID()); + RAY_CHECK(job_clusters_.find(job_cluster_id) == job_clusters_.end()); + + DoCreateJobCluster(data.id(), toReplicaInstances(data.node_instances())); +} + +Status ExclusiveCluster::CreateJobCluster(const std::string &job_cluster_id, ReplicaSets replica_sets, CreateOrUpdateVirtualClusterCallback callback) { if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be created in exclusive mode, virtual_cluster_id: " - << GetID() << ", job_name: " << job_name; + << GetID(); return Status::InvalidArgument(ostr.str()); } - auto job_cluster_id = - VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_name).Binary(); - auto iter = job_clusters_.find(job_cluster_id); if (iter != job_clusters_.end()) { std::ostringstream ostr; @@ -236,6 +246,14 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_name, return Status::OutOfResource(ostr.str()); } + auto job_cluster = + DoCreateJobCluster(job_cluster_id, std::move(replica_instances_to_add)); + // Flush and publish the job cluster data. + return async_data_flusher_(job_cluster->ToProto(), std::move(callback)); +} + +std::shared_ptr ExclusiveCluster::DoCreateJobCluster( + const std::string &job_cluster_id, ReplicaInstances replica_instances_to_add) { auto replica_instances_to_remove_from_current_cluster = replica_instances_to_add; auto replica_instances_to_add_to_current_cluster = replica_instances_to_add; for (auto &[template_id, job_node_instances] : @@ -247,27 +265,22 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_name, UpdateNodeInstances(std::move(replica_instances_to_add_to_current_cluster), std::move(replica_instances_to_remove_from_current_cluster)); - // Create a job cluster. auto job_cluster = std::make_shared(job_cluster_id); job_cluster->UpdateNodeInstances(std::move(replica_instances_to_add), ReplicaInstances()); RAY_CHECK(job_clusters_.emplace(job_cluster_id, job_cluster).second); - - // Flush and publish the job cluster data. - return async_data_flusher_(job_cluster->ToProto(), std::move(callback)); + return job_cluster; } -Status ExclusiveCluster::RemoveJobCluster(const std::string &job_name, +Status ExclusiveCluster::RemoveJobCluster(const std::string &job_cluster_id, RemoveVirtualClusterCallback callback) { if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be removed in exclusive mode, virtual_cluster_id: " - << GetID() << ", job_name: " << job_name; + << GetID(); return Status::InvalidArgument(ostr.str()); } - auto job_cluster_id = - VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_name).Binary(); auto iter = job_clusters_.find(job_cluster_id); if (iter == job_clusters_.end()) { return Status::NotFound("The job cluster " + job_cluster_id + " does not exist."); @@ -300,9 +313,7 @@ Status ExclusiveCluster::RemoveJobCluster(const std::string &job_name, } std::shared_ptr ExclusiveCluster::GetJobCluster( - const std::string &job_id) const { - auto job_cluster_id = - VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_id).Binary(); + const std::string &job_cluster_id) const { auto iter = job_clusters_.find(job_cluster_id); return iter != job_clusters_.end() ? iter->second : nullptr; } @@ -316,13 +327,12 @@ bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id, } void ExclusiveCluster::ForeachJobCluster( - const std::function &)> - &fn) const { + const std::function &)> &fn) const { if (fn == nullptr) { return; } - for (const auto &[job_cluster_id, job_cluster] : job_clusters_) { - fn(job_cluster_id, job_cluster); + for (const auto &[_, job_cluster] : job_clusters_) { + fn(job_cluster); } } @@ -341,12 +351,104 @@ bool MixedCluster::InUse() const { } ///////////////////////// PrimaryCluster ///////////////////////// +void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) { + const auto &nodes = gcs_init_data.Nodes(); + absl::flat_hash_map dead_node_instances; + absl::flat_hash_map + logical_clusters_data; + absl::flat_hash_map + job_clusters_data; + for (const auto &[virtual_cluster_id, virtual_cluster_data] : + gcs_init_data.VirtualClusters()) { + if (virtual_cluster_id.IsJobClusterID()) { + job_clusters_data[virtual_cluster_id] = &virtual_cluster_data; + } else { + logical_clusters_data[virtual_cluster_id] = &virtual_cluster_data; + } + for (auto &[id, node_instance] : virtual_cluster_data.node_instances()) { + auto node_id = NodeID::FromHex(id); + if (!nodes.contains(node_id) || + nodes.at(node_id).state() == rpc::GcsNodeInfo::DEAD) { + dead_node_instances.emplace(id, node_instance.template_id()); + } + } + } + + for (const auto &[_, node] : nodes) { + if (node.state() == rpc::GcsNodeInfo::ALIVE) { + OnNodeAdd(node); + } + } + + for (const auto &[_, virtual_cluster_data] : logical_clusters_data) { + LoadLogicalCluster(*virtual_cluster_data); + } + + for (auto &[job_cluster_id, virtual_cluster_data] : job_clusters_data) { + auto parent_cluster_id = job_cluster_id.ParentID().Binary(); + if (parent_cluster_id == kPrimaryClusterID) { + LoadJobCluster(*virtual_cluster_data); + } else { + auto logical_cluster = std::dynamic_pointer_cast( + GetLogicalCluster(parent_cluster_id)); + RAY_CHECK(logical_cluster != nullptr && + logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE); + logical_cluster->LoadJobCluster(*virtual_cluster_data); + } + } + + for (const auto &[id, node_type_name] : dead_node_instances) { + OnNodeInstanceDead(id, node_type_name); + } +} + std::shared_ptr PrimaryCluster::GetLogicalCluster( const std::string &logical_cluster_id) const { auto iter = logical_clusters_.find(logical_cluster_id); return iter != logical_clusters_.end() ? iter->second : nullptr; } +void PrimaryCluster::ForeachVirtualCluster( + const std::function &)> &fn) const { + if (fn == nullptr) { + return; + } + for (const auto &[_, logical_cluster] : logical_clusters_) { + fn(logical_cluster); + if (logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { + auto exclusive_cluster = + std::dynamic_pointer_cast(logical_cluster); + exclusive_cluster->ForeachJobCluster(fn); + } + } + ForeachJobCluster(fn); +} + +std::shared_ptr PrimaryCluster::LoadLogicalCluster( + const rpc::VirtualClusterTableData &data) { + const auto &logical_cluster_id = data.id(); + std::shared_ptr logical_cluster; + if (data.mode() == rpc::AllocationMode::EXCLUSIVE) { + logical_cluster = + std::make_shared(logical_cluster_id, async_data_flusher_); + } else { + logical_cluster = std::make_shared(logical_cluster_id); + } + RAY_CHECK(logical_clusters_.emplace(logical_cluster_id, logical_cluster).second); + + auto replica_instances_to_add_to_logical_cluster = + toReplicaInstances(data.node_instances()); + auto replica_instances_to_remove_from_primary_cluster = + replica_instances_to_add_to_logical_cluster; + UpdateNodeInstances(ReplicaInstances(), + std::move(replica_instances_to_remove_from_primary_cluster)); + + // Update the virtual cluster replica sets and node instances. + logical_cluster->UpdateNodeInstances( + std::move(replica_instances_to_add_to_logical_cluster), ReplicaInstances()); + return logical_cluster; +} + Status PrimaryCluster::CreateOrUpdateVirtualCluster( rpc::CreateOrUpdateVirtualClusterRequest request, CreateOrUpdateVirtualClusterCallback callback) { @@ -438,16 +540,20 @@ void PrimaryCluster::OnNodeAdd(const rpc::GcsNodeInfo &node) { } void PrimaryCluster::OnNodeDead(const rpc::GcsNodeInfo &node) { - const auto &template_id = node.node_type_name(); + const auto &node_type_name = node.node_type_name(); auto node_instance_id = NodeID::FromBinary(node.node_id()).Hex(); + OnNodeInstanceDead(node_instance_id, node_type_name); +} +void PrimaryCluster::OnNodeInstanceDead(const std::string &node_instance_id, + const std::string &node_type_name) { // TODO(Shanly): Build an index from node instance id to cluster id. - if (MarkNodeInstanceAsDead(template_id, node_instance_id)) { + if (MarkNodeInstanceAsDead(node_type_name, node_instance_id)) { return; } for (const auto &[_, logical_cluster] : logical_clusters_) { - if (logical_cluster->MarkNodeInstanceAsDead(template_id, node_instance_id)) { + if (logical_cluster->MarkNodeInstanceAsDead(node_type_name, node_instance_id)) { return; } } @@ -525,9 +631,7 @@ void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest reque if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { auto exclusive_cluster = dynamic_cast(cluster); exclusive_cluster->ForeachJobCluster( - [&](const std::string &_, const auto &job_cluster) { - callback(job_cluster->ToProto()); - }); + [&](const auto &job_cluster) { callback(job_cluster->ToProto()); }); } if (only_include_mixed_cluster && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 2f92e83a0a80d..f7b0188f12c66 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -18,6 +18,7 @@ #include "absl/container/flat_hash_set.h" #include "ray/common/status.h" #include "ray/common/virtual_cluster_id.h" +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "src/ray/protobuf/gcs.pb.h" #include "src/ray/protobuf/gcs_service.pb.h" @@ -98,6 +99,17 @@ ReplicaSets ReplicasDifference(const T1 &left, const T2 &right) { return result; } +template +ReplicaInstances toReplicaInstances(const T &node_instances) { + ReplicaInstances result; + for (const auto &[id, node_instance] : node_instances) { + auto inst = std::make_shared(); + inst->set_hostname(node_instance.hostname()); + inst->set_template_id(node_instance.template_id()); + result[node_instance.template_id()][kEmptyJobClusterId].emplace(id, std::move(inst)); + } + return result; +} class VirtualCluster { public: VirtualCluster(const std::string &id) : id_(id) {} @@ -207,33 +219,45 @@ class ExclusiveCluster : public VirtualCluster { const std::string &GetID() const override { return id_; } rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } + /// Load a job cluster to the exclusive cluster. + /// + /// \param data The data of the job cluster. + void LoadJobCluster(const rpc::VirtualClusterTableData &data); + + /// Build the job cluster id. + /// + /// \param job_name The name of the job. + /// \return The job cluster id. + std::string BuildJobClusterID(const std::string &job_name) { + return VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_name).Binary(); + } + /// Create a job cluster. /// - /// \param job_name The name of job to create the job cluster. + /// \param job_cluster_id The id of the job cluster. /// \param replica_sets The replica sets of the job cluster. /// \return Status The status of the creation. - Status CreateJobCluster(const std::string &job_name, + Status CreateJobCluster(const std::string &job_cluster_id, ReplicaSets replica_sets, CreateOrUpdateVirtualClusterCallback callback); /// Remove a job cluster. /// - /// \param job_name The name of job to remove the job cluster. + /// \param job_cluster_id The id of the job cluster to be removed. /// \param callback The callback that will be called after the job cluster is removed. /// \return Status The status of the removal. - Status RemoveJobCluster(const std::string &job_name, + Status RemoveJobCluster(const std::string &job_cluster_id, RemoveVirtualClusterCallback callback); /// Get the job cluster by the job cluster id. /// - /// \param job_name The name of job to get the job cluster. + /// \param job_cluster_id The id of the job cluster. /// \return The job cluster if it exists, otherwise return nullptr. - std::shared_ptr GetJobCluster(const std::string &job_name) const; + std::shared_ptr GetJobCluster(const std::string &job_cluster_id) const; /// Iterate all job clusters. void ForeachJobCluster( - const std::function &)> - &fn) const; + const std::function &)> &fn) const; /// Check if the virtual cluster is in use. /// @@ -244,6 +268,14 @@ class ExclusiveCluster : public VirtualCluster { bool IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const override; + /// Create a job cluster to the exclusive cluster. + /// + /// \param job_cluster_id The id of the job cluster. + /// \param replica_instances_to_add The node instances to be added. + /// \return The created job cluster. + std::shared_ptr DoCreateJobCluster( + const std::string &job_cluster_id, ReplicaInstances replica_instances_to_add); + // The mapping from job cluster id to `JobCluster` instance. absl::flat_hash_map> job_clusters_; // The async data flusher. @@ -279,6 +311,19 @@ class PrimaryCluster : public ExclusiveCluster { : ExclusiveCluster(kPrimaryClusterID, async_data_flusher) {} PrimaryCluster &operator=(const PrimaryCluster &) = delete; + /// Initialize with the gcs tables data synchronously. + /// This should be called when GCS server restarts after a failure. + /// + /// \param gcs_init_data. + void Initialize(const GcsInitData &gcs_init_data); + + /// Load a logical cluster to the primary cluster. + /// + /// \param data The data of the logical cluster. + /// \return The loaded logical cluster. + std::shared_ptr LoadLogicalCluster( + const rpc::VirtualClusterTableData &data); + const std::string &GetID() const override { return kPrimaryClusterID; } rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } @@ -297,6 +342,12 @@ class PrimaryCluster : public ExclusiveCluster { std::shared_ptr GetLogicalCluster( const std::string &logical_cluster_id) const; + /// Iterate all virtual clusters. + /// + /// \param fn The function to be called for each logical cluster. + void ForeachVirtualCluster( + const std::function &)> &fn) const; + /// Get virtual cluster by virtual cluster id /// /// \param virtual_cluster_id The id of virtual cluster @@ -331,6 +382,13 @@ class PrimaryCluster : public ExclusiveCluster { bool IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const override; + /// Handle the node dead event. + /// + /// \param node_instance_id The id of the node instance that is dead. + /// \param node_type_name The type name of the node instance that is dead. + void OnNodeInstanceDead(const std::string &node_instance_id, + const std::string &node_type_name); + private: /// Calculate the node instances that to be added and to be removed /// based on the demand final replica sets inside the request. diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc index c39ffc2547175..49fae62859df1 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc @@ -20,7 +20,7 @@ namespace ray { namespace gcs { void GcsVirtualClusterManager::Initialize(const GcsInitData &gcs_init_data) { - // TODO(Shanly): To be implement. + primary_cluster_->Initialize(gcs_init_data); } void GcsVirtualClusterManager::OnNodeAdd(const rpc::GcsNodeInfo &node) { @@ -219,6 +219,9 @@ Status GcsVirtualClusterManager::FlushAndPublish( if (data->mode() != rpc::AllocationMode::MIXED) { // Tasks can only be scheduled on the nodes in the mixed cluster, so we just need to // publish the mixed cluster data. + if (callback) { + callback(status, std::move(data)); + } return; } diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index 72cbd8d47299f..3c65d37437e22 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -41,6 +41,76 @@ class GcsVirtualClusterManagerTest : public ::testing::Test { std::unique_ptr gcs_virtual_cluster_manager_; }; +class MockGcsInitData : public GcsInitData { + public: + using GcsInitData::GcsInitData; + + void SetNodes( + const absl::flat_hash_map> &nodes) { + for (const auto &[node_id, node] : nodes) { + node_table_data_[node_id] = *node; + } + } + + void SetVirtualClusters( + const absl::flat_hash_map> + &virtual_clusters) { + for (const auto &[virtual_cluster_id, virtual_cluster] : virtual_clusters) { + virtual_cluster_table_data_[VirtualClusterID::FromBinary(virtual_cluster_id)] = + *virtual_cluster; + } + } +}; + +bool ReplicaInstancesEquals(const ReplicaInstances &lhs, const ReplicaInstances &rhs) { + if (lhs.size() != rhs.size()) { + return false; + } + for (const auto &[template_id, job_node_instances] : lhs) { + if (!rhs.contains(template_id)) { + return false; + } + const auto &rhs_job_node_instances = rhs.at(template_id); + if (job_node_instances.size() != rhs_job_node_instances.size()) { + return false; + } + for (const auto &[job_cluster_id, node_instances] : job_node_instances) { + if (!rhs_job_node_instances.contains(job_cluster_id)) { + return false; + } + const auto &rhs_node_instances = rhs_job_node_instances.at(job_cluster_id); + if (node_instances.size() != rhs_node_instances.size()) { + return false; + } + for (const auto &[node_instance_id, node_instance] : node_instances) { + if (!rhs_node_instances.contains(node_instance_id)) { + return false; + } + const auto &rhs_node_instance = rhs_node_instances.at(node_instance_id); + if (node_instance->hostname() != rhs_node_instance->hostname() || + node_instance->template_id() != rhs_node_instance->template_id() || + node_instance->is_dead() != rhs_node_instance->is_dead()) { + return false; + } + } + } + } + return true; +} + +bool ReplicaSetsEquals(const ReplicaSets &lhs, const ReplicaSets &rhs) { + if (lhs.size() != rhs.size()) { + return false; + } + for (const auto &[template_id, count] : lhs) { + if (!rhs.contains(template_id) || rhs.at(template_id) != count) { + return false; + } + } + return true; +} + class VirtualClusterTest : public ::testing::Test { public: std::shared_ptr InitPrimaryCluster( @@ -50,7 +120,8 @@ class VirtualClusterTest : public ::testing::Test { absl::flat_hash_map>> *template_id_to_nodes = nullptr) { auto primary_cluster = - std::make_shared([](auto data, auto callback) { + std::make_shared([this](auto data, auto callback) { + virtual_clusters_data_[data->id()] = data; callback(Status::OK(), data); return Status::OK(); }); @@ -64,6 +135,7 @@ class VirtualClusterTest : public ::testing::Test { (*template_id_to_nodes)[template_id].emplace(NodeID::FromBinary(node->node_id()), node); } + nodes_.emplace(NodeID::FromBinary(node->node_id()), node); } return primary_cluster; } @@ -85,6 +157,10 @@ class VirtualClusterTest : public ::testing::Test { }); return status; } + + absl::flat_hash_map> nodes_; + absl::flat_hash_map> + virtual_clusters_data_; }; class PrimaryClusterTest : public VirtualClusterTest { @@ -294,12 +370,11 @@ TEST_F(PrimaryClusterTest, CreateJobCluster) { std::string template_id_1 = "1"; size_t node_count_per_template = node_count / template_count; - std::string job_id_0 = "job_0"; - + std::string job_cluster_id_0 = primary_cluster->BuildJobClusterID("job_0"); { // Create job_cluster_id_0 and check that the status is ok. auto status = primary_cluster->CreateJobCluster( - job_id_0, + job_cluster_id_0, {{template_id_0, 5}, {template_id_1, 10}}, [this](const Status &status, std::shared_ptr data) { ASSERT_TRUE(status.ok()); @@ -307,9 +382,8 @@ TEST_F(PrimaryClusterTest, CreateJobCluster) { ASSERT_TRUE(status.ok()); } - auto job_cluster_0 = primary_cluster->GetJobCluster(job_id_0); + auto job_cluster_0 = primary_cluster->GetJobCluster(job_cluster_id_0); ASSERT_NE(job_cluster_0, nullptr); - auto job_cluster_id_0 = job_cluster_0->GetID(); { // Check the job cluster job_cluster_id_0 visible node instances. const auto &visiable_node_instances = job_cluster_0->GetVisibleNodeInstances(); @@ -342,11 +416,11 @@ TEST_F(PrimaryClusterTest, CreateJobCluster) { node_count_per_template - 10); } - std::string job_id_1 = "job_1"; + std::string job_cluster_id_1 = primary_cluster->BuildJobClusterID("job_1"); { // Create job_cluster_id_1 and check that the status is ok. auto status = primary_cluster->CreateJobCluster( - job_id_1, + job_cluster_id_1, {{template_id_0, node_count_per_template - 5}, {template_id_1, node_count_per_template - 10}}, [this](const Status &status, std::shared_ptr data) { @@ -355,9 +429,8 @@ TEST_F(PrimaryClusterTest, CreateJobCluster) { ASSERT_TRUE(status.ok()); } - auto job_cluster_1 = primary_cluster->GetJobCluster(job_id_1); + auto job_cluster_1 = primary_cluster->GetJobCluster(job_cluster_id_1); ASSERT_NE(job_cluster_1, nullptr); - auto job_cluster_id_1 = job_cluster_1->GetID(); { // Check the job cluster job_cluster_id_1 visible node instances. const auto &visiable_node_instances = job_cluster_1->GetVisibleNodeInstances(); @@ -406,12 +479,11 @@ TEST_F(PrimaryClusterTest, RemoveJobCluster) { std::string template_id_1 = "1"; size_t node_count_per_template = node_count / template_count; - std::string job_id_0 = "job_0"; - + std::string job_cluster_id_0 = primary_cluster->BuildJobClusterID("job_0"); { // Create job_cluster_id_0 and check that the status is ok. auto status = primary_cluster->CreateJobCluster( - job_id_0, + job_cluster_id_0, {{template_id_0, 5}, {template_id_1, 10}}, [this](const Status &status, std::shared_ptr data) { ASSERT_TRUE(status.ok()); @@ -419,9 +491,8 @@ TEST_F(PrimaryClusterTest, RemoveJobCluster) { ASSERT_TRUE(status.ok()); } - auto job_cluster_0 = primary_cluster->GetJobCluster(job_id_0); + auto job_cluster_0 = primary_cluster->GetJobCluster(job_cluster_id_0); ASSERT_NE(job_cluster_0, nullptr); - auto job_cluster_id_0 = job_cluster_0->GetID(); { // Check the job cluster job_cluster_id_0 visible node instances. const auto &visiable_node_instances = job_cluster_0->GetVisibleNodeInstances(); @@ -456,13 +527,13 @@ TEST_F(PrimaryClusterTest, RemoveJobCluster) { { auto status = primary_cluster->RemoveJobCluster( - job_id_0, + job_cluster_id_0, [this](const Status &status, std::shared_ptr data) { ASSERT_TRUE(status.ok()); ASSERT_TRUE(data->is_removed()); }); ASSERT_TRUE(status.ok()); - ASSERT_EQ(primary_cluster->GetJobCluster(job_id_0), nullptr); + ASSERT_EQ(primary_cluster->GetJobCluster(job_cluster_id_0), nullptr); } { @@ -481,9 +552,10 @@ TEST_F(PrimaryClusterTest, RemoveJobCluster) { } { + std::string job_cluster_id_1 = primary_cluster->BuildJobClusterID("job_1"); // Remove the job cluster that does not exist. auto status = primary_cluster->RemoveJobCluster( - "job_1", + job_cluster_id_1, [this](const Status &status, std::shared_ptr data) { ASSERT_FALSE(true); }); @@ -591,6 +663,7 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { std::string virtual_cluster_id_0 = "virtual_cluster_id_0"; std::string virtual_cluster_id_1 = "virtual_cluster_id_1"; + std::string job_cluster_id_0 = primary_cluster->BuildJobClusterID("job_0"); ASSERT_TRUE(CreateVirtualCluster(primary_cluster, virtual_cluster_id_0, {{template_id_0, 5}, {template_id_1, 5}}) @@ -602,7 +675,7 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { .ok()); ASSERT_TRUE(primary_cluster - ->CreateJobCluster("job_0", + ->CreateJobCluster(job_cluster_id_0, {{template_id_0, 10}, {template_id_1, 10}}, [this](const Status &status, auto data) { ASSERT_TRUE(status.ok()); @@ -612,8 +685,10 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { auto virtual_cluster_0 = std::dynamic_pointer_cast( primary_cluster->GetLogicalCluster(virtual_cluster_id_0)); ASSERT_TRUE(virtual_cluster_0 != nullptr); + + std::string job_cluster_id_1 = virtual_cluster_0->BuildJobClusterID("job_1"); ASSERT_TRUE(virtual_cluster_0 - ->CreateJobCluster("job_1", + ->CreateJobCluster(job_cluster_id_1, {{template_id_0, 2}, {template_id_1, 2}}, [this](const Status &status, auto data) { ASSERT_TRUE(status.ok()); @@ -642,7 +717,7 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { ASSERT_EQ(virtual_clusters_data_map.size(), 2); ASSERT_TRUE(virtual_clusters_data_map.contains(virtual_cluster_id_0)); - auto job_cluster = virtual_cluster_0->GetJobCluster("job_1"); + auto job_cluster = virtual_cluster_0->GetJobCluster(job_cluster_id_1); ASSERT_TRUE(job_cluster != nullptr); ASSERT_TRUE(virtual_clusters_data_map.contains(job_cluster->GetID())); @@ -659,5 +734,189 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { } } +// ┌───────────────────────────────────────────────────┐ +// │ ┌───────────────────┐ ┌─────────┐ │ +// │ │ virtual_cluster_1 │ │ │ Exclusive │ +// │ │ Exclusive │ │ │ │ +// │ │ ┌─────────┐ │ │ job_0 │ │ +// │ │ │ job_1 │ │ │ │ │ +// │ │ │ │ │ │ │ │ +// │ │ └─────────┘ │ │ │ │ +// │ └───────────────────┘ │ │ │ +// │ ┌───────────────────┐ │ │ │ +// │ │ virtual_cluster_2 │ │ │ │ +// │ │ Mixed │ │ │ │ +// │ │ │ │ │ │ +// │ │ │ │ │ │ +// │ └───────────────────┘ └─────────┘ │ +// └───────────────────────────────────────────────────┘ +class FailoverTest : public PrimaryClusterTest { + public: + using PrimaryClusterTest::PrimaryClusterTest; + + std::shared_ptr InitVirtualClusters(size_t node_count, + size_t template_count) { + auto primary_cluster = InitPrimaryCluster(node_count, template_count); + + RAY_CHECK_OK(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_1_, + {{template_id_0_, 5}, {template_id_1_, 5}})); + + RAY_CHECK_OK(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_2_, + {{template_id_0_, 5}, {template_id_1_, 5}}, + rpc::AllocationMode::MIXED)); + + job_cluster_id_0_ = primary_cluster->BuildJobClusterID("job_0"); + RAY_CHECK_OK(primary_cluster->CreateJobCluster( + job_cluster_id_0_, + {{template_id_0_, 10}, {template_id_1_, 10}}, + [this](const Status &status, auto data) { ASSERT_TRUE(status.ok()); })); + + auto virtual_cluster_1 = std::dynamic_pointer_cast( + primary_cluster->GetLogicalCluster(virtual_cluster_id_1_)); + RAY_CHECK(virtual_cluster_1 != nullptr); + + job_cluster_id_1_ = virtual_cluster_1->BuildJobClusterID("job_1"); + RAY_CHECK_OK(virtual_cluster_1->CreateJobCluster( + job_cluster_id_1_, + {{template_id_0_, 2}, {template_id_1_, 2}}, + [this](const Status &status, auto data) { ASSERT_TRUE(status.ok()); })); + return primary_cluster; + } + + std::string template_id_0_ = "0"; + std::string template_id_1_ = "1"; + std::string job_cluster_id_0_; + std::string job_cluster_id_1_; + std::string virtual_cluster_id_1_ = "virtual_cluster_id_1"; + std::string virtual_cluster_id_2_ = "virtual_cluster_id_2"; +}; + +TEST_F(FailoverTest, FailoverNormal) { + size_t node_count = 200; + size_t template_count = 10; + auto primary_cluster = InitVirtualClusters(node_count, template_count); + ASSERT_EQ(virtual_clusters_data_.size(), 4); + + // Mock a gcs_init_data. + instrumented_io_context io_service; + gcs::InMemoryGcsTableStorage gcs_table_storage(io_service); + MockGcsInitData gcs_init_data(gcs_table_storage); + gcs_init_data.SetNodes(nodes_); + gcs_init_data.SetVirtualClusters(virtual_clusters_data_); + + // Failover to a new primary cluster. + PrimaryCluster new_primary_cluster([this](auto data, auto callback) { + callback(Status::OK(), data); + return Status::OK(); + }); + new_primary_cluster.Initialize(gcs_init_data); + + // Check the visible node instances and replica sets of the primary cluster are the + // same. + ASSERT_TRUE(ReplicaInstancesEquals(new_primary_cluster.GetVisibleNodeInstances(), + primary_cluster->GetVisibleNodeInstances())); + ASSERT_TRUE(ReplicaSetsEquals(new_primary_cluster.GetReplicaSets(), + primary_cluster->GetReplicaSets())); + + // Check the visible node instances and replica sets of the virtual clusters are the + // same. + primary_cluster->ForeachVirtualCluster( + [this, &new_primary_cluster](const auto &logical_cluster) { + auto new_virtual_cluster = + new_primary_cluster.GetVirtualCluster(logical_cluster->GetID()); + ASSERT_TRUE(new_virtual_cluster != nullptr); + ASSERT_TRUE(ReplicaInstancesEquals(new_virtual_cluster->GetVisibleNodeInstances(), + logical_cluster->GetVisibleNodeInstances())); + ASSERT_TRUE(ReplicaSetsEquals(new_virtual_cluster->GetReplicaSets(), + logical_cluster->GetReplicaSets())); + }); +} + +TEST_F(FailoverTest, FailoverWithDeadNodes) { + size_t node_count = 200; + size_t template_count = 10; + auto primary_cluster = InitVirtualClusters(node_count, template_count); + ASSERT_EQ(virtual_clusters_data_.size(), 4); + + auto job_cluster_0 = primary_cluster->GetJobCluster(job_cluster_id_0_); + ASSERT_TRUE(job_cluster_0 != nullptr); + + auto virtual_cluster_1 = std::dynamic_pointer_cast( + primary_cluster->GetVirtualCluster(virtual_cluster_id_1_)); + ASSERT_TRUE(virtual_cluster_1 != nullptr); + + auto virtual_cluster_2 = primary_cluster->GetVirtualCluster(virtual_cluster_id_2_); + ASSERT_TRUE(virtual_cluster_2 != nullptr); + + auto job_cluster_1 = virtual_cluster_1->GetJobCluster(job_cluster_id_1_); + ASSERT_TRUE(job_cluster_1 != nullptr); + + absl::flat_hash_set dead_node_ids; + // Mock template_id_0_ nodes are dead in job_cluster_0 and virtual_cluster_1. + dead_node_ids.emplace(NodeID::FromHex(job_cluster_0->GetVisibleNodeInstances() + .at(template_id_0_) + .at(kEmptyJobClusterId) + .begin() + ->first)); + dead_node_ids.emplace(NodeID::FromHex(job_cluster_1->GetVisibleNodeInstances() + .at(template_id_0_) + .at(kEmptyJobClusterId) + .begin() + ->first)); + // Mock template_id_1_ nodes are dead in virtual_cluster_1 and virtual_cluster_2. + dead_node_ids.emplace(NodeID::FromHex(virtual_cluster_1->GetVisibleNodeInstances() + .at(template_id_1_) + .at(kEmptyJobClusterId) + .begin() + ->first)); + dead_node_ids.emplace(NodeID::FromHex(virtual_cluster_2->GetVisibleNodeInstances() + .at(template_id_1_) + .at(kEmptyJobClusterId) + .begin() + ->first)); + // Erase the dead nodes. + for (const auto &dead_node_id : dead_node_ids) { + const auto &dead_node = nodes_.at(dead_node_id); + primary_cluster->OnNodeDead(*dead_node); + nodes_.erase(dead_node_id); + } + + // Mock a gcs_init_data. + instrumented_io_context io_service; + gcs::InMemoryGcsTableStorage gcs_table_storage(io_service); + MockGcsInitData gcs_init_data(gcs_table_storage); + gcs_init_data.SetNodes(nodes_); + gcs_init_data.SetVirtualClusters(virtual_clusters_data_); + + // Failover to a new primary cluster. + PrimaryCluster new_primary_cluster([this](auto data, auto callback) { + callback(Status::OK(), data); + return Status::OK(); + }); + new_primary_cluster.Initialize(gcs_init_data); + + // Check the visible node instances and replica sets of the primary cluster are not the + // same. + // ASSERT_TRUE(ReplicaInstancesEquals(new_primary_cluster.GetVisibleNodeInstances(), + // primary_cluster->GetVisibleNodeInstances())); + // ASSERT_FALSE(ReplicaSetsEquals(new_primary_cluster.GetReplicaSets(), + // primary_cluster->GetReplicaSets())); + + // Check the visible node instances and replica sets of the virtual clusters are the + // same. + primary_cluster->ForeachVirtualCluster( + [this, &new_primary_cluster](const auto &logical_cluster) { + auto new_virtual_cluster = + new_primary_cluster.GetVirtualCluster(logical_cluster->GetID()); + ASSERT_TRUE(new_virtual_cluster != nullptr); + ASSERT_TRUE(ReplicaInstancesEquals(new_virtual_cluster->GetVisibleNodeInstances(), + logical_cluster->GetVisibleNodeInstances())); + ASSERT_TRUE(ReplicaSetsEquals(new_virtual_cluster->GetReplicaSets(), + logical_cluster->GetReplicaSets())); + }); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 944945f4f7bce..c484af57a9777 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -729,12 +729,10 @@ message VirtualClusterTableData { string id = 1; // The allocation mode of the virtual cluster. AllocationMode mode = 2; - // The replica set list of the virtual cluster. - map replica_sets = 3; // Mapping from node id to it's instance. - map node_instances = 4; + map node_instances = 3; // Whether this virtual cluster is removed. - bool is_removed = 5; + bool is_removed = 4; // Version number of the last modification to the virtual cluster. - uint64 revision = 6; + uint64 revision = 5; }