Skip to content

Commit

Permalink
Fix and UT
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong-Li committed Jan 7, 2025
1 parent 44e1062 commit aa48d00
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import socket
import tempfile
import time

import pytest
Expand All @@ -9,11 +10,13 @@
import ray
from ray._private.test_utils import (
format_web_url,
get_resource_usage,
wait_for_condition,
wait_until_server_available,
)
from ray.cluster_utils import Cluster
from ray.dashboard.tests.conftest import * # noqa
from ray.job_submission import JobStatus, JobSubmissionClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -447,3 +450,162 @@ def _get_virtual_clusters():
return False

wait_for_condition(_get_virtual_clusters, timeout=10)


# Because raylet is responsible for task scheduling, gcs depends on the resource_view
# sync to get to know the pending/running tasks at each node. If the resource_view sync
# lags, gcs may mistakenly consider one node idle when removing node instances from
# a virtual cluster. In this case, we have to clean up the pending/running tasks at
# the node (just removed). This test makes sure the cleanup is correctly enforced.
@pytest.mark.parametrize(
"ray_start_cluster_head",
[
{
"include_dashboard": True,
"_system_config": {
"gcs_actor_scheduling_enabled": False,
# Make the resource_view sync message lag.
"raylet_report_resources_period_milliseconds": 30000,
},
}
],
indirect=True,
)
def test_cleanup_tasks_after_removing_node_instance(
disable_aiohttp_cache, ray_start_cluster_head
):
cluster: Cluster = ray_start_cluster_head
assert wait_until_server_available(cluster.webui_url) is True
webui_url = cluster.webui_url
webui_url = format_web_url(webui_url)

# Add one 4c8g node to the primary cluster.
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "4c8g"}, num_cpus=4)

# Create a virtual cluster with one 4c8g node.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
divisible=False,
replica_sets={"4c8g": 1},
revision=0,
)
assert result["result"] is True
revision = result["data"]["revision"]

client = JobSubmissionClient(webui_url)
temp_dir = None
file_path = None

try:
# Create a temporary directory
temp_dir = tempfile.mkdtemp()

# Define driver: create two actors, requiring 4 cpus each.
driver_content = """
import ray
import time
@ray.remote
class SmallActor():
def __init__(self):
pass
actors = []
for _ in range(2):
actors.append(SmallActor.options(num_cpus=4).remote())
time.sleep(600)
"""

# Create a temporary Python file.
file_path = os.path.join(temp_dir, "test_driver.py")

with open(file_path, "w") as file:
file.write(driver_content)

absolute_path = os.path.abspath(file_path)

# Submit the job to the virtual cluster.
job = client.submit_job(
entrypoint=f"python {absolute_path}",
virtual_cluster_id="virtual_cluster_1",
)

def check_job_running():
status = client.get_job_status(job)
return status == JobStatus.RUNNING

wait_for_condition(check_job_running)

def check_actors():
actors = ray._private.state.actors()
# There is only one 4c8g node in the virtual cluster, we shall see
# one alive actor and one actor pending creation.
expected_states = {"ALIVE": 1, "PENDING_CREATION": 1}
actor_states = {}
for _, actor in actors.items():
if actor["ActorClassName"] == "SmallActor":
actor_states[actor["State"]] = (
actor_states.get(actor["State"], 0) + 1
)
if actor_states == expected_states:
return True
return False

wait_for_condition(check_actors)

# Scale down the virtual cluster, removing one node instance.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
divisible=False,
replica_sets={"4c8g": 0},
revision=revision,
)
# Because resource_view sync is lagging, the node instance was
# successfully removed.
assert result["result"] is True

def check_actors_after_update():
actors = ray._private.state.actors()
# If the node (just removed from the virtual cluster) cleans up
# its pending and running tasks, we shall see two dead actors now.
expected_states = {"DEAD": 2}
actor_states = {}
for _, actor in actors.items():
if actor["ActorClassName"] == "SmallActor":
actor_states[actor["State"]] = (
actor_states.get(actor["State"], 0) + 1
)
if actor_states == expected_states:
return True
return False

wait_for_condition(check_actors_after_update)

def check_running_and_pending_tasks():
resources_batch = get_resource_usage(
gcs_address=cluster.head_node.gcs_address
)
# Check each node's resource usage, making sure no running
# or pending tasks left.
for node in resources_batch.batch:
if "CPU" not in node.resources_available:
return False
if (
len(node.resource_load_by_shape.resource_demands) > 0
and node.resource_load_by_shape.resource_demands[
0
].num_ready_requests_queued
> 0
):
return False
return True

wait_for_condition(check_running_and_pending_tasks)

finally:
if file_path:
os.remove(file_path)
if temp_dir:
os.rmdir(temp_dir)
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ bool IndivisibleCluster::IsIdleNodeInstance(
auto node_id =
scheduling::NodeID(NodeID::FromHex(node_instance.node_instance_id()).Binary());
const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id);
// TODO(Chong-Li): the resource view sync message may lag.
if (node_resources.normal_task_resources.IsEmpty() &&
node_resources.total == node_resources.available) {
return true;
Expand Down
12 changes: 10 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,22 @@ NodeManager::NodeManager(

virtual_cluster_manager_ = std::make_shared<VirtualClusterManager>(
self_node_id_, /*on_local_node_instance_removed=*/[this]() {
cluster_task_manager_->CancelTasks(
auto tasks_canceled = cluster_task_manager_->CancelTasks(
[](const std::shared_ptr<internal::Work> &work) { return true; },
rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED,
"The node is removed from a virtual cluster.");
local_task_manager_->CancelTasks(
if (tasks_canceled) {
RAY_LOG(DEBUG) << "Tasks are cleaned up from cluster_task_manager because the "
"node is removed from virtual cluster.";
}
tasks_canceled = local_task_manager_->CancelTasks(
[](const std::shared_ptr<internal::Work> &work) { return true; },
rpc::RequestWorkerLeaseReply::SCHEDULING_FAILED,
"The node is removed from a virtual cluster.");
if (tasks_canceled) {
RAY_LOG(DEBUG) << "Tasks are cleaned up from local_task_manager because the "
"node is removed from virtual cluster.";
}
if (!cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeIdle()) {
for (auto &[_, worker] : leased_workers_) {
RAY_LOG(DEBUG).WithField(worker->WorkerId())
Expand Down

0 comments on commit aa48d00

Please sign in to comment.