Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Li <[email protected]>
  • Loading branch information
Chong-Li committed Jan 9, 2025
1 parent 6124603 commit 7ca6fa7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 49 deletions.
37 changes: 19 additions & 18 deletions src/ray/gcs/gcs_client/accessor.ant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,20 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll(
const auto updated_subscribe =
[this, subscribe](const VirtualClusterID &virtual_cluster_id,
rpc::VirtualClusterTableData &&virtual_cluster_data) {
if (virtual_cluster_revisions_.contains(virtual_cluster_id)) {
if (virtual_cluster_data.revision() <
virtual_cluster_revisions_[virtual_cluster_id]) {
auto iter = virtual_clusters_.find(virtual_cluster_id);
if (iter != virtual_clusters_.end()) {
if (virtual_cluster_data.revision() < iter->second.revision()) {
RAY_LOG(WARNING) << "The revision of the received virtual cluster ("
<< virtual_cluster_id << ") is outdated. Ignore it.";
return;
}
if (virtual_cluster_data.is_removed()) {
virtual_cluster_revisions_.erase(virtual_cluster_id);
virtual_clusters_.erase(iter);
} else {
virtual_cluster_revisions_[virtual_cluster_id] =
virtual_cluster_data.revision();
iter->second = virtual_cluster_data;
}
} else {
virtual_cluster_revisions_[virtual_cluster_id] =
virtual_cluster_data.revision();
virtual_clusters_[virtual_cluster_id] = virtual_cluster_data;
}

subscribe(virtual_cluster_id, std::move(virtual_cluster_data));
Expand All @@ -97,21 +95,24 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll(
[this, updated_subscribe, done](
const Status &status,
std::vector<rpc::VirtualClusterTableData> &&virtual_cluster_info_list) {
auto virtual_cluster_revisions_copy = virtual_cluster_revisions_;
absl::flat_hash_set<VirtualClusterID> virtual_cluster_id_set;
for (auto &virtual_cluster_info : virtual_cluster_info_list) {
auto virtual_cluster_id =
VirtualClusterID::FromBinary(virtual_cluster_info.id());
updated_subscribe(virtual_cluster_id, std::move(virtual_cluster_info));
virtual_cluster_revisions_copy.erase(virtual_cluster_id);
virtual_cluster_id_set.emplace(virtual_cluster_id);
}
for (const auto &[virtual_cluster_id, _] : virtual_cluster_revisions_copy) {
// If there is any left data in `virtual_cluster_revisions_copy`, it means the
// local node may miss the pub messages (when gcs removed virtual clusters) in
// the past. So we have to mock a `virtual_cluster_table_data` (specifying
// removed) and notify the subscriber to clean its local cache.
rpc::VirtualClusterTableData virtual_cluster_table_data;
virtual_cluster_table_data.set_is_removed(true);
updated_subscribe(virtual_cluster_id, std::move(virtual_cluster_table_data));
for (auto iter = virtual_clusters_.begin(); iter != virtual_clusters_.end();) {
auto curr_iter = iter++;
// If there is any virtual cluster not in `virtual_cluster_id_set`, it means
// the local node may miss the pub messages (when gcs removed virtual
// clusters) in the past. So we have to explicitely notify the subscriber to
// clean its local cache.
if (!virtual_cluster_id_set.contains(curr_iter->first)) {
auto virtual_cluster_data = curr_iter->second;
virtual_cluster_data.set_is_removed(true);
updated_subscribe(curr_iter->first, std::move(virtual_cluster_data));
}
}
if (done) {
done(status);
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,8 +1040,8 @@ class VirtualClusterInfoAccessor {

GcsClient *client_impl_;

// The revision of each virtual cluster.
absl::flat_hash_map<VirtualClusterID, uint64_t> virtual_cluster_revisions_;
// Local cache of the virtual cluster data. It can be used for revision control.
absl::flat_hash_map<VirtualClusterID, rpc::VirtualClusterTableData> virtual_clusters_;
};

} // namespace gcs
Expand Down
51 changes: 22 additions & 29 deletions src/ray/raylet/virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,37 @@ bool VirtualClusterManager::UpdateVirtualCluster(
return false;
}

const auto &virtual_cluster_id = virtual_cluster_data.id();
auto it = virtual_clusters_.find(virtual_cluster_id);
if (it == virtual_clusters_.end()) {
// Clean up the pending and running tasks at the local node. Make sure
// no leftover tasks moving to the new virtual cluster (it is a no-op in most cases).
if (virtual_cluster_data.node_instances().contains(local_node_instance_id_)) {
virtual_cluster_id_ = virtual_cluster_id;
local_node_cleanup_fn_();
}
virtual_clusters_[virtual_cluster_id] = std::move(virtual_cluster_data);
} else {
if (virtual_cluster_data.is_removed()) {
// The virtual cluster id of the input data.
const auto &input_virtual_cluster_id = virtual_cluster_data.id();

if (virtual_cluster_data.is_removed()) {
if (virtual_cluster_id_ == input_virtual_cluster_id) {
virtual_cluster_id_.clear();
// The virtual cluster is removed, we have to clean up
// the local tasks (it is a no-op in most cases).
if (virtual_cluster_id_ == virtual_cluster_id) {
virtual_cluster_id_.clear();
local_node_cleanup_fn_();
}
virtual_clusters_.erase(it);
return true;
local_node_cleanup_fn_();
}
virtual_clusters_.erase(input_virtual_cluster_id);
} else {
// Whether the local node in the input data.
bool local_node_in_data =
virtual_cluster_data.node_instances().contains(local_node_instance_id_);

if (it->second.node_instances().contains(local_node_instance_id_) &&
!virtual_cluster_data.node_instances().contains(local_node_instance_id_)) {
// The local node is removed from its current virtual cluster.
if (virtual_cluster_id_ == input_virtual_cluster_id && !local_node_in_data) {
virtual_cluster_id_.clear();
// If local node is removed from a virtual cluster, we have to clean up
// the local tasks (it is a no-op in most cases).
// Clean up the local tasks (it is a no-op in most cases).
local_node_cleanup_fn_();
} else if (!it->second.node_instances().contains(local_node_instance_id_) &&
virtual_cluster_data.node_instances().contains(local_node_instance_id_)) {
virtual_cluster_id_ = virtual_cluster_id;
// If the pub message (removing the local node from a virtual cluster) was lost
// (miss the chance to clean up the local node), we have to clean up when adding the
// local node to a virtual cluster (it is a no-op in most cases).
} else if (virtual_cluster_id_ != input_virtual_cluster_id &&
local_node_in_data) { // The local node is added to a new virtual cluster.
virtual_cluster_id_ = input_virtual_cluster_id;
// There are chances that the pub message (removing the local node from a virtual
// cluster) was lost in the past, so we also have to clean up when adding the local
// node to a new virtual cluster (it is a no-op in most cases).
local_node_cleanup_fn_();
}

it->second = std::move(virtual_cluster_data);
virtual_clusters_[input_virtual_cluster_id] = std::move(virtual_cluster_data);
}
return true;
}
Expand Down

0 comments on commit 7ca6fa7

Please sign in to comment.