Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VirtualCluster] Clean up after removing node instances from a virtual cluster #441

Merged
merged 9 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import socket
import tempfile
import time

import pytest
Expand All @@ -9,11 +10,13 @@
import ray
from ray._private.test_utils import (
format_web_url,
get_resource_usage,
wait_for_condition,
wait_until_server_available,
)
from ray.cluster_utils import Cluster
from ray.dashboard.tests.conftest import * # noqa
from ray.job_submission import JobStatus, JobSubmissionClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -447,3 +450,162 @@ def _get_virtual_clusters():
return False

wait_for_condition(_get_virtual_clusters, timeout=10)


# Because raylet is responsible for task scheduling, gcs depends on the resource_view
# sync to get to know the pending/running tasks at each node. If the resource_view sync
# lags, gcs may mistakenly consider one node idle when removing node instances from
# a virtual cluster. In this case, we have to clean up the pending/running tasks at
# the node (just removed). This test makes sure the cleanup is correctly enforced.
@pytest.mark.parametrize(
"ray_start_cluster_head",
[
{
"include_dashboard": True,
"_system_config": {
"gcs_actor_scheduling_enabled": False,
# Make the resource_view sync message lag.
"raylet_report_resources_period_milliseconds": 30000,
},
}
],
indirect=True,
)
def test_cleanup_tasks_after_removing_node_instance(
disable_aiohttp_cache, ray_start_cluster_head
):
cluster: Cluster = ray_start_cluster_head
assert wait_until_server_available(cluster.webui_url) is True
webui_url = cluster.webui_url
webui_url = format_web_url(webui_url)

# Add one 4c8g node to the primary cluster.
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "4c8g"}, num_cpus=4)

# Create a virtual cluster with one 4c8g node.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
divisible=False,
replica_sets={"4c8g": 1},
revision=0,
)
assert result["result"] is True
revision = result["data"]["revision"]

client = JobSubmissionClient(webui_url)
temp_dir = None
file_path = None

try:
# Create a temporary directory
temp_dir = tempfile.mkdtemp()

# Define driver: create two actors, requiring 4 cpus each.
driver_content = """
import ray
import time

@ray.remote
class SmallActor():
def __init__(self):
pass

actors = []
for _ in range(2):
actors.append(SmallActor.options(num_cpus=4).remote())
time.sleep(600)
"""

# Create a temporary Python file.
file_path = os.path.join(temp_dir, "test_driver.py")

with open(file_path, "w") as file:
file.write(driver_content)

absolute_path = os.path.abspath(file_path)

# Submit the job to the virtual cluster.
job = client.submit_job(
entrypoint=f"python {absolute_path}",
virtual_cluster_id="virtual_cluster_1",
)

def check_job_running():
status = client.get_job_status(job)
return status == JobStatus.RUNNING

wait_for_condition(check_job_running)

def check_actors():
actors = ray._private.state.actors()
# There is only one 4c8g node in the virtual cluster, we shall see
# one alive actor and one actor pending creation.
expected_states = {"ALIVE": 1, "PENDING_CREATION": 1}
actor_states = {}
for _, actor in actors.items():
if actor["ActorClassName"] == "SmallActor":
actor_states[actor["State"]] = (
actor_states.get(actor["State"], 0) + 1
)
if actor_states == expected_states:
return True
return False

wait_for_condition(check_actors)

# Scale down the virtual cluster, removing one node instance.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
divisible=False,
replica_sets={"4c8g": 0},
revision=revision,
)
# Because resource_view sync is lagging, the node instance was
# successfully removed.
assert result["result"] is True

def check_actors_after_update():
actors = ray._private.state.actors()
# If the node (just removed from the virtual cluster) cleans up
# its pending and running tasks, we shall see two dead actors now.
expected_states = {"DEAD": 2}
actor_states = {}
for _, actor in actors.items():
if actor["ActorClassName"] == "SmallActor":
actor_states[actor["State"]] = (
actor_states.get(actor["State"], 0) + 1
)
if actor_states == expected_states:
return True
return False

wait_for_condition(check_actors_after_update)

def check_running_and_pending_tasks():
resources_batch = get_resource_usage(
gcs_address=cluster.head_node.gcs_address
)
# Check each node's resource usage, making sure no running
# or pending tasks left.
for node in resources_batch.batch:
if "CPU" not in node.resources_available:
return False
if (
len(node.resource_load_by_shape.resource_demands) > 0
and node.resource_load_by_shape.resource_demands[
0
].num_ready_requests_queued
> 0
):
return False
return True

wait_for_condition(check_running_and_pending_tasks)

finally:
if file_path:
os.remove(file_path)
if temp_dir:
os.rmdir(temp_dir)
49 changes: 43 additions & 6 deletions src/ray/gcs/gcs_client/accessor.ant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,50 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<VirtualClusterID, rpc::VirtualClusterTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) {
const auto updated_subscribe =
[this, subscribe](const VirtualClusterID &virtual_cluster_id,
rpc::VirtualClusterTableData &&virtual_cluster_data) {
auto iter = virtual_clusters_.find(virtual_cluster_id);
if (iter != virtual_clusters_.end()) {
if (virtual_cluster_data.revision() < iter->second.revision()) {
RAY_LOG(WARNING) << "The revision of the received virtual cluster ("
<< virtual_cluster_id << ") is outdated. Ignore it.";
return;
}
if (virtual_cluster_data.is_removed()) {
virtual_clusters_.erase(iter);
} else {
iter->second = virtual_cluster_data;
}
} else {
virtual_clusters_[virtual_cluster_id] = virtual_cluster_data;
}

subscribe(virtual_cluster_id, std::move(virtual_cluster_data));
};
fetch_all_data_operation_ = [this, updated_subscribe](const StatusCallback &done) {
auto callback =
[subscribe, done](
[this, updated_subscribe, done](
const Status &status,
std::vector<rpc::VirtualClusterTableData> &&virtual_cluster_info_list) {
absl::flat_hash_set<VirtualClusterID> virtual_cluster_id_set;
for (auto &virtual_cluster_info : virtual_cluster_info_list) {
subscribe(VirtualClusterID::FromBinary(virtual_cluster_info.id()),
std::move(virtual_cluster_info));
auto virtual_cluster_id =
VirtualClusterID::FromBinary(virtual_cluster_info.id());
updated_subscribe(virtual_cluster_id, std::move(virtual_cluster_info));
virtual_cluster_id_set.emplace(virtual_cluster_id);
}
for (auto iter = virtual_clusters_.begin(); iter != virtual_clusters_.end();) {
auto curr_iter = iter++;
// If there is any virtual cluster not in `virtual_cluster_id_set`, it means
// the local node may miss the pub messages (when gcs removed virtual
// clusters) in the past. So we have to explicitely notify the subscriber to
// clean its local cache.
if (!virtual_cluster_id_set.contains(curr_iter->first)) {
auto virtual_cluster_data = curr_iter->second;
virtual_cluster_data.set_is_removed(true);
updated_subscribe(curr_iter->first, std::move(virtual_cluster_data));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke subscribe directly.

}
}
if (done) {
done(status);
Expand All @@ -87,8 +123,9 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll(
/*only_include_indivisible_clusters=*/true,
callback));
};
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
return client_impl_->GetGcsSubscriber().SubscribeAllVirtualClusters(subscribe, done);
subscribe_operation_ = [this, updated_subscribe](const StatusCallback &done) {
return client_impl_->GetGcsSubscriber().SubscribeAllVirtualClusters(updated_subscribe,
done);
};
return subscribe_operation_(
[this, done](const Status &status) { fetch_all_data_operation_(done); });
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,9 @@ class VirtualClusterInfoAccessor {
SubscribeOperation subscribe_operation_;

GcsClient *client_impl_;

// Local cache of the virtual cluster data. It can be used for revision control.
absl::flat_hash_map<VirtualClusterID, rpc::VirtualClusterTableData> virtual_clusters_;
};

} // namespace gcs
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ bool IndivisibleCluster::IsIdleNodeInstance(
auto node_id =
scheduling::NodeID(NodeID::FromHex(node_instance.node_instance_id()).Binary());
const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id);
// TODO(Chong-Li): the resource view sync message may lag.
if (node_resources.normal_task_resources.IsEmpty() &&
node_resources.total == node_resources.available) {
return true;
Expand Down
30 changes: 29 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,35 @@ NodeManager::NodeManager(
mutable_object_provider_ = std::make_unique<core::experimental::MutableObjectProvider>(
*store_client_, absl::bind_front(&NodeManager::CreateRayletClient, this));

virtual_cluster_manager_ = std::make_shared<VirtualClusterManager>();
virtual_cluster_manager_ = std::make_shared<VirtualClusterManager>(
self_node_id_, /*local_node_cleanup_fn=*/[this]() {
auto tasks_canceled = cluster_task_manager_->CancelTasks(
[](const std::shared_ptr<internal::Work> &work) { return true; },
rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED,
"The node is removed from a virtual cluster.");
if (tasks_canceled) {
RAY_LOG(DEBUG) << "Tasks are cleaned up from cluster_task_manager because the "
"node is removed from virtual cluster.";
}
tasks_canceled = local_task_manager_->CancelTasks(
[](const std::shared_ptr<internal::Work> &work) { return true; },
rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED,
"The node is removed from a virtual cluster.");
if (tasks_canceled) {
RAY_LOG(DEBUG) << "Tasks are cleaned up from local_task_manager because the "
"node is removed from virtual cluster.";
}
if (!cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeIdle()) {
for (auto &[_, worker] : leased_workers_) {
RAY_LOG(DEBUG).WithField(worker->WorkerId())
<< "Worker is cleaned because the node is removed from virtual cluster.";
DestroyWorker(
worker,
rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
"Worker is cleaned because the node is removed from virtual cluster.");
}
}
});
}

std::shared_ptr<raylet::RayletClient> NodeManager::CreateRayletClient(
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void ClusterResourceScheduler::Init(
*cluster_resource_manager_,
/*is_node_available_fn*/
[this](auto node_id) { return this->NodeAvailable(node_id); },
/*is_node_schedulable_fn*/
/*is_node_schedulable*/
is_node_schedulable_fn_);
bundle_scheduling_policy_ =
std::make_unique<raylet_scheduling_policy::CompositeBundleSchedulingPolicy>(
Expand Down
40 changes: 27 additions & 13 deletions src/ray/raylet/virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,37 @@ bool VirtualClusterManager::UpdateVirtualCluster(
return false;
}

const auto &virtual_cluster_id = virtual_cluster_data.id();
auto it = virtual_clusters_.find(virtual_cluster_id);
if (it == virtual_clusters_.end()) {
virtual_clusters_[virtual_cluster_id] = std::move(virtual_cluster_data);
} else {
if (it->second.revision() > virtual_cluster_data.revision()) {
RAY_LOG(WARNING)
<< "The revision of the received virtual cluster is outdated, ignore it.";
return false;
// The virtual cluster id of the input data.
const auto &input_virtual_cluster_id = virtual_cluster_data.id();

if (virtual_cluster_data.is_removed()) {
if (virtual_cluster_id_ == input_virtual_cluster_id) {
virtual_cluster_id_.clear();
// The virtual cluster is removed, we have to clean up
// the local tasks (it is a no-op in most cases).
local_node_cleanup_fn_();
}
virtual_clusters_.erase(input_virtual_cluster_id);
} else {
// Whether the local node in the input data.
bool local_node_in_data =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool local_node_in_data =
bool input_virtual_cluster_contains_local_node =

virtual_cluster_data.node_instances().contains(local_node_instance_id_);

if (virtual_cluster_data.is_removed()) {
virtual_clusters_.erase(it);
return true;
// The local node is removed from its current virtual cluster.
if (virtual_cluster_id_ == input_virtual_cluster_id && !local_node_in_data) {
virtual_cluster_id_.clear();
// Clean up the local tasks (it is a no-op in most cases).
local_node_cleanup_fn_();
} else if (virtual_cluster_id_ != input_virtual_cluster_id &&
local_node_in_data) { // The local node is added to a new virtual cluster.
virtual_cluster_id_ = input_virtual_cluster_id;
// There are chances that the pub message (removing the local node from a virtual
// cluster) was lost in the past, so we also have to clean up when adding the local
// node to a new virtual cluster (it is a no-op in most cases).
local_node_cleanup_fn_();
}

it->second = std::move(virtual_cluster_data);
virtual_clusters_[input_virtual_cluster_id] = std::move(virtual_cluster_data);
}
return true;
}
Expand Down
10 changes: 9 additions & 1 deletion src/ray/raylet/virtual_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ namespace raylet {

class VirtualClusterManager {
public:
VirtualClusterManager() = default;
VirtualClusterManager(const NodeID &node_id,
std::function<void()> local_node_cleanup_fn)
: local_node_instance_id_(node_id.Hex()),
local_node_cleanup_fn_(local_node_cleanup_fn) {}

/// Update the virtual cluster.
///
Expand All @@ -47,6 +50,11 @@ class VirtualClusterManager {
private:
/// The virtual clusters.
absl::flat_hash_map<std::string, rpc::VirtualClusterTableData> virtual_clusters_;
/// The local node instance id.
std::string local_node_instance_id_;
std::function<void()> local_node_cleanup_fn_;
/// The (indivisible) virtual cluster to which the local node belongs.
std::string virtual_cluster_id_;
Copy link
Collaborator

@wumuzi520 wumuzi520 Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_virtual_cluster_id_

};

} // namespace raylet
Expand Down