From aa48d0030ac4c367ec3f9802d1784b0c18f51d46 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Tue, 7 Jan 2025 21:46:22 +0800 Subject: [PATCH] Fix and UT --- .../tests/test_virtual_cluster.py | 162 ++++++++++++++++++ src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 1 - src/ray/raylet/node_manager.cc | 12 +- 3 files changed, 172 insertions(+), 3 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py b/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py index 8315af75fe6c..ad556e0b3185 100644 --- a/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py +++ b/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py @@ -1,6 +1,7 @@ import logging import os import socket +import tempfile import time import pytest @@ -9,11 +10,13 @@ import ray from ray._private.test_utils import ( format_web_url, + get_resource_usage, wait_for_condition, wait_until_server_available, ) from ray.cluster_utils import Cluster from ray.dashboard.tests.conftest import * # noqa +from ray.job_submission import JobStatus, JobSubmissionClient logger = logging.getLogger(__name__) @@ -447,3 +450,162 @@ def _get_virtual_clusters(): return False wait_for_condition(_get_virtual_clusters, timeout=10) + + +# Because raylet is responsible for task scheduling, gcs depends on the resource_view +# sync to get to know the pending/running tasks at each node. If the resource_view sync +# lags, gcs may mistakenly consider one node idle when removing node instances from +# a virtual cluster. In this case, we have to clean up the pending/running tasks at +# the node (just removed). This test makes sure the cleanup is correctly enforced. +@pytest.mark.parametrize( + "ray_start_cluster_head", + [ + { + "include_dashboard": True, + "_system_config": { + "gcs_actor_scheduling_enabled": False, + # Make the resource_view sync message lag. + "raylet_report_resources_period_milliseconds": 30000, + }, + } + ], + indirect=True, +) +def test_cleanup_tasks_after_removing_node_instance( + disable_aiohttp_cache, ray_start_cluster_head +): + cluster: Cluster = ray_start_cluster_head + assert wait_until_server_available(cluster.webui_url) is True + webui_url = cluster.webui_url + webui_url = format_web_url(webui_url) + + # Add one 4c8g node to the primary cluster. + cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "4c8g"}, num_cpus=4) + + # Create a virtual cluster with one 4c8g node. + result = create_or_update_virtual_cluster( + webui_url=webui_url, + virtual_cluster_id="virtual_cluster_1", + divisible=False, + replica_sets={"4c8g": 1}, + revision=0, + ) + assert result["result"] is True + revision = result["data"]["revision"] + + client = JobSubmissionClient(webui_url) + temp_dir = None + file_path = None + + try: + # Create a temporary directory + temp_dir = tempfile.mkdtemp() + + # Define driver: create two actors, requiring 4 cpus each. + driver_content = """ +import ray +import time + +@ray.remote +class SmallActor(): + def __init__(self): + pass + +actors = [] +for _ in range(2): + actors.append(SmallActor.options(num_cpus=4).remote()) +time.sleep(600) + """ + + # Create a temporary Python file. + file_path = os.path.join(temp_dir, "test_driver.py") + + with open(file_path, "w") as file: + file.write(driver_content) + + absolute_path = os.path.abspath(file_path) + + # Submit the job to the virtual cluster. + job = client.submit_job( + entrypoint=f"python {absolute_path}", + virtual_cluster_id="virtual_cluster_1", + ) + + def check_job_running(): + status = client.get_job_status(job) + return status == JobStatus.RUNNING + + wait_for_condition(check_job_running) + + def check_actors(): + actors = ray._private.state.actors() + # There is only one 4c8g node in the virtual cluster, we shall see + # one alive actor and one actor pending creation. + expected_states = {"ALIVE": 1, "PENDING_CREATION": 1} + actor_states = {} + for _, actor in actors.items(): + if actor["ActorClassName"] == "SmallActor": + actor_states[actor["State"]] = ( + actor_states.get(actor["State"], 0) + 1 + ) + if actor_states == expected_states: + return True + return False + + wait_for_condition(check_actors) + + # Scale down the virtual cluster, removing one node instance. + result = create_or_update_virtual_cluster( + webui_url=webui_url, + virtual_cluster_id="virtual_cluster_1", + divisible=False, + replica_sets={"4c8g": 0}, + revision=revision, + ) + # Because resource_view sync is lagging, the node instance was + # successfully removed. + assert result["result"] is True + + def check_actors_after_update(): + actors = ray._private.state.actors() + # If the node (just removed from the virtual cluster) cleans up + # its pending and running tasks, we shall see two dead actors now. + expected_states = {"DEAD": 2} + actor_states = {} + for _, actor in actors.items(): + if actor["ActorClassName"] == "SmallActor": + actor_states[actor["State"]] = ( + actor_states.get(actor["State"], 0) + 1 + ) + if actor_states == expected_states: + return True + return False + + wait_for_condition(check_actors_after_update) + + def check_running_and_pending_tasks(): + resources_batch = get_resource_usage( + gcs_address=cluster.head_node.gcs_address + ) + # Check each node's resource usage, making sure no running + # or pending tasks left. + for node in resources_batch.batch: + if "CPU" not in node.resources_available: + return False + if ( + len(node.resource_load_by_shape.resource_demands) > 0 + and node.resource_load_by_shape.resource_demands[ + 0 + ].num_ready_requests_queued + > 0 + ): + return False + return True + + wait_for_condition(check_running_and_pending_tasks) + + finally: + if file_path: + os.remove(file_path) + if temp_dir: + os.rmdir(temp_dir) diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 77b83fe4735a..08f7a818f0d8 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -543,7 +543,6 @@ bool IndivisibleCluster::IsIdleNodeInstance( auto node_id = scheduling::NodeID(NodeID::FromHex(node_instance.node_instance_id()).Binary()); const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id); - // TODO(Chong-Li): the resource view sync message may lag. if (node_resources.normal_task_resources.IsEmpty() && node_resources.total == node_resources.available) { return true; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f0d18c9e3f5c..9c956d4b8a17 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -418,14 +418,22 @@ NodeManager::NodeManager( virtual_cluster_manager_ = std::make_shared( self_node_id_, /*on_local_node_instance_removed=*/[this]() { - cluster_task_manager_->CancelTasks( + auto tasks_canceled = cluster_task_manager_->CancelTasks( [](const std::shared_ptr &work) { return true; }, rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED, "The node is removed from a virtual cluster."); - local_task_manager_->CancelTasks( + if (tasks_canceled) { + RAY_LOG(DEBUG) << "Tasks are cleaned up from cluster_task_manager because the " + "node is removed from virtual cluster."; + } + tasks_canceled = local_task_manager_->CancelTasks( [](const std::shared_ptr &work) { return true; }, rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED, "The node is removed from a virtual cluster."); + if (tasks_canceled) { + RAY_LOG(DEBUG) << "Tasks are cleaned up from local_task_manager because the " + "node is removed from virtual cluster."; + } if (!cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeIdle()) { for (auto &[_, worker] : leased_workers_) { RAY_LOG(DEBUG).WithField(worker->WorkerId())