From 7ca6fa75555cdf9386c11736f5a3df37bec6f97a Mon Sep 17 00:00:00 2001 From: Chong Li Date: Thu, 9 Jan 2025 15:19:33 +0800 Subject: [PATCH] Fix Signed-off-by: Chong Li --- src/ray/gcs/gcs_client/accessor.ant.cc | 37 ++++++++-------- src/ray/gcs/gcs_client/accessor.h | 4 +- src/ray/raylet/virtual_cluster_manager.cc | 51 ++++++++++------------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/src/ray/gcs/gcs_client/accessor.ant.cc b/src/ray/gcs/gcs_client/accessor.ant.cc index 2c44d8425621f..186c88d9f311f 100644 --- a/src/ray/gcs/gcs_client/accessor.ant.cc +++ b/src/ray/gcs/gcs_client/accessor.ant.cc @@ -72,22 +72,20 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll( const auto updated_subscribe = [this, subscribe](const VirtualClusterID &virtual_cluster_id, rpc::VirtualClusterTableData &&virtual_cluster_data) { - if (virtual_cluster_revisions_.contains(virtual_cluster_id)) { - if (virtual_cluster_data.revision() < - virtual_cluster_revisions_[virtual_cluster_id]) { + auto iter = virtual_clusters_.find(virtual_cluster_id); + if (iter != virtual_clusters_.end()) { + if (virtual_cluster_data.revision() < iter->second.revision()) { RAY_LOG(WARNING) << "The revision of the received virtual cluster (" << virtual_cluster_id << ") is outdated. Ignore it."; return; } if (virtual_cluster_data.is_removed()) { - virtual_cluster_revisions_.erase(virtual_cluster_id); + virtual_clusters_.erase(iter); } else { - virtual_cluster_revisions_[virtual_cluster_id] = - virtual_cluster_data.revision(); + iter->second = virtual_cluster_data; } } else { - virtual_cluster_revisions_[virtual_cluster_id] = - virtual_cluster_data.revision(); + virtual_clusters_[virtual_cluster_id] = virtual_cluster_data; } subscribe(virtual_cluster_id, std::move(virtual_cluster_data)); @@ -97,21 +95,24 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll( [this, updated_subscribe, done]( const Status &status, std::vector &&virtual_cluster_info_list) { - auto virtual_cluster_revisions_copy = virtual_cluster_revisions_; + absl::flat_hash_set virtual_cluster_id_set; for (auto &virtual_cluster_info : virtual_cluster_info_list) { auto virtual_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_info.id()); updated_subscribe(virtual_cluster_id, std::move(virtual_cluster_info)); - virtual_cluster_revisions_copy.erase(virtual_cluster_id); + virtual_cluster_id_set.emplace(virtual_cluster_id); } - for (const auto &[virtual_cluster_id, _] : virtual_cluster_revisions_copy) { - // If there is any left data in `virtual_cluster_revisions_copy`, it means the - // local node may miss the pub messages (when gcs removed virtual clusters) in - // the past. So we have to mock a `virtual_cluster_table_data` (specifying - // removed) and notify the subscriber to clean its local cache. - rpc::VirtualClusterTableData virtual_cluster_table_data; - virtual_cluster_table_data.set_is_removed(true); - updated_subscribe(virtual_cluster_id, std::move(virtual_cluster_table_data)); + for (auto iter = virtual_clusters_.begin(); iter != virtual_clusters_.end();) { + auto curr_iter = iter++; + // If there is any virtual cluster not in `virtual_cluster_id_set`, it means + // the local node may miss the pub messages (when gcs removed virtual + // clusters) in the past. So we have to explicitely notify the subscriber to + // clean its local cache. + if (!virtual_cluster_id_set.contains(curr_iter->first)) { + auto virtual_cluster_data = curr_iter->second; + virtual_cluster_data.set_is_removed(true); + updated_subscribe(curr_iter->first, std::move(virtual_cluster_data)); + } } if (done) { done(status); diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index 62de4310a3621..16bc1b957b650 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -1040,8 +1040,8 @@ class VirtualClusterInfoAccessor { GcsClient *client_impl_; - // The revision of each virtual cluster. - absl::flat_hash_map virtual_cluster_revisions_; + // Local cache of the virtual cluster data. It can be used for revision control. + absl::flat_hash_map virtual_clusters_; }; } // namespace gcs diff --git a/src/ray/raylet/virtual_cluster_manager.cc b/src/ray/raylet/virtual_cluster_manager.cc index 8b2ed49903432..978f406853cf3 100644 --- a/src/ray/raylet/virtual_cluster_manager.cc +++ b/src/ray/raylet/virtual_cluster_manager.cc @@ -29,44 +29,37 @@ bool VirtualClusterManager::UpdateVirtualCluster( return false; } - const auto &virtual_cluster_id = virtual_cluster_data.id(); - auto it = virtual_clusters_.find(virtual_cluster_id); - if (it == virtual_clusters_.end()) { - // Clean up the pending and running tasks at the local node. Make sure - // no leftover tasks moving to the new virtual cluster (it is a no-op in most cases). - if (virtual_cluster_data.node_instances().contains(local_node_instance_id_)) { - virtual_cluster_id_ = virtual_cluster_id; - local_node_cleanup_fn_(); - } - virtual_clusters_[virtual_cluster_id] = std::move(virtual_cluster_data); - } else { - if (virtual_cluster_data.is_removed()) { + // The virtual cluster id of the input data. + const auto &input_virtual_cluster_id = virtual_cluster_data.id(); + + if (virtual_cluster_data.is_removed()) { + if (virtual_cluster_id_ == input_virtual_cluster_id) { + virtual_cluster_id_.clear(); // The virtual cluster is removed, we have to clean up // the local tasks (it is a no-op in most cases). - if (virtual_cluster_id_ == virtual_cluster_id) { - virtual_cluster_id_.clear(); - local_node_cleanup_fn_(); - } - virtual_clusters_.erase(it); - return true; + local_node_cleanup_fn_(); } + virtual_clusters_.erase(input_virtual_cluster_id); + } else { + // Whether the local node in the input data. + bool local_node_in_data = + virtual_cluster_data.node_instances().contains(local_node_instance_id_); - if (it->second.node_instances().contains(local_node_instance_id_) && - !virtual_cluster_data.node_instances().contains(local_node_instance_id_)) { + // The local node is removed from its current virtual cluster. + if (virtual_cluster_id_ == input_virtual_cluster_id && !local_node_in_data) { virtual_cluster_id_.clear(); - // If local node is removed from a virtual cluster, we have to clean up - // the local tasks (it is a no-op in most cases). + // Clean up the local tasks (it is a no-op in most cases). local_node_cleanup_fn_(); - } else if (!it->second.node_instances().contains(local_node_instance_id_) && - virtual_cluster_data.node_instances().contains(local_node_instance_id_)) { - virtual_cluster_id_ = virtual_cluster_id; - // If the pub message (removing the local node from a virtual cluster) was lost - // (miss the chance to clean up the local node), we have to clean up when adding the - // local node to a virtual cluster (it is a no-op in most cases). + } else if (virtual_cluster_id_ != input_virtual_cluster_id && + local_node_in_data) { // The local node is added to a new virtual cluster. + virtual_cluster_id_ = input_virtual_cluster_id; + // There are chances that the pub message (removing the local node from a virtual + // cluster) was lost in the past, so we also have to clean up when adding the local + // node to a new virtual cluster (it is a no-op in most cases). local_node_cleanup_fn_(); } - it->second = std::move(virtual_cluster_data); + virtual_clusters_[input_virtual_cluster_id] = std::move(virtual_cluster_data); } return true; }