Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21/N][VirtualCluster] refactor mixed/exclusive cluster to indivisible/divisible cluster #443

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated.gcs_pb2 import AllocationMode
from ray.core.generated.gcs_service_pb2 import CreateOrUpdateVirtualClusterRequest
from ray.dashboard.modules.job.common import (
JOB_ACTOR_NAME_TEMPLATE,
Expand Down Expand Up @@ -110,7 +109,7 @@ async def job_sdk_client(request, make_sure_dashboard_http_port_unused, external


async def create_virtual_cluster(
gcs_address, virtual_cluster_id, replica_sets, allocation_mode=AllocationMode.MIXED
gcs_address, virtual_cluster_id, replica_sets, divisible=False
):
channel = GcsChannel(gcs_address, aio=True)
channel.connect()
Expand All @@ -119,7 +118,7 @@ async def create_virtual_cluster(
)
request = CreateOrUpdateVirtualClusterRequest(
virtual_cluster_id=virtual_cluster_id,
mode=allocation_mode,
divisible=divisible,
replica_sets=replica_sets,
)
reply = await (gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request))
Expand Down Expand Up @@ -356,7 +355,7 @@ def _check_recover(
indirect=True,
)
@pytest.mark.asyncio
async def test_exclusive_virtual_cluster(job_sdk_client):
async def test_divisible_virtual_cluster(job_sdk_client):
head_client, gcs_address, cluster = job_sdk_client
virtual_cluster_id_prefix = "VIRTUAL_CLUSTER_"
node_to_virtual_cluster = {}
Expand All @@ -367,7 +366,7 @@ async def test_exclusive_virtual_cluster(job_sdk_client):
gcs_address,
virtual_cluster_id,
{TEMPLATE_ID_PREFIX + str(i): 2},
AllocationMode.EXCLUSIVE,
True,
)
for node_id in nodes:
assert node_id not in node_to_virtual_cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@


def create_or_update_virtual_cluster(
webui_url, virtual_cluster_id, allocation_mode, replica_sets, revision
webui_url, virtual_cluster_id, divisible, replica_sets, revision
):
try:
resp = requests.post(
webui_url + "/virtual_clusters",
json={
"virtualClusterId": virtual_cluster_id,
"allocationMode": allocation_mode,
"divisible": divisible,
"replicaSets": replica_sets,
"revision": revision,
},
Expand Down Expand Up @@ -81,14 +81,14 @@ def test_create_and_update_virtual_cluster(
revision = 0

def _check_create_or_update_virtual_cluster(
virtual_cluster_id, allocation_mode, replica_sets
virtual_cluster_id, divisible, replica_sets
):
nonlocal revision
resp = requests.post(
webui_url + "/virtual_clusters",
json={
"virtualClusterId": virtual_cluster_id,
"allocationMode": allocation_mode,
"divisible": divisible,
"replicaSets": replica_sets,
"revision": revision,
},
Expand All @@ -111,59 +111,59 @@ def _check_create_or_update_virtual_cluster(
# The virtual cluster has the same node types and count as expected.
assert replica_sets == virtual_cluster_replica_sets

# Create a new virtual cluster with exclusive allocation mode.
# Create a new divisible virtual cluster.
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_1",
allocation_mode="exclusive",
divisible=True,
replica_sets={"4c8g": 1, "8c16g": 1},
)

# Update the virtual cluster with less nodes (scale down).
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_1",
allocation_mode="exclusive",
divisible=True,
replica_sets={"4c8g": 1},
)

# Update the virtual cluster with more nodes (scale up).
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_1",
allocation_mode="exclusive",
divisible=True,
replica_sets={"4c8g": 1, "8c16g": 1},
)

# Update the virtual cluster with zero node (make it empty).
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_1",
allocation_mode="exclusive",
divisible=True,
replica_sets={},
)

# `virtual_cluster_1` has released all nodes, so we can now
# create a new (mixed) virtual cluster with two nodes.
# create a new indivisible virtual cluster with two nodes.
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_2",
allocation_mode="mixed",
divisible=False,
replica_sets={"4c8g": 1, "8c16g": 1},
)

# Update the virtual cluster with less nodes.
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_2",
allocation_mode="mixed",
divisible=False,
replica_sets={"4c8g": 1},
)

# Update the virtual cluster with more nodes.
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_2",
allocation_mode="mixed",
divisible=False,
replica_sets={"4c8g": 1, "8c16g": 1},
)

# Update the virtual cluster with zero node (make it empty).
_check_create_or_update_virtual_cluster(
virtual_cluster_id="virtual_cluster_2", allocation_mode="mixed", replica_sets={}
virtual_cluster_id="virtual_cluster_2", divisible=False, replica_sets={}
)


Expand All @@ -176,9 +176,9 @@ def _check_create_or_update_virtual_cluster(
],
indirect=True,
)
@pytest.mark.parametrize("allocation_mode", ["exclusive", "mixed"])
@pytest.mark.parametrize("divisible", [True, False])
def test_create_and_update_virtual_cluster_with_exceptions(
disable_aiohttp_cache, ray_start_cluster_head, allocation_mode
disable_aiohttp_cache, ray_start_cluster_head, divisible
):
cluster: Cluster = ray_start_cluster_head
assert wait_until_server_available(cluster.webui_url) is True
Expand All @@ -193,7 +193,7 @@ def test_create_and_update_virtual_cluster_with_exceptions(
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"16c32g": 1},
revision=0,
)
Expand All @@ -202,14 +202,14 @@ def test_create_and_update_virtual_cluster_with_exceptions(
replica_sets = result["data"].get("replicaSetsToRecommend", {})
# The primary cluster can fulfill none `16c32g` node to meet the
# virtual cluster's requirement.
assert replica_sets == {}
assert replica_sets == {"16c32g": 0}

# Create a new virtual cluster with node count that the primary cluster
# can not provide.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"4c8g": 2, "8c16g": 1},
revision=0,
)
Expand All @@ -224,7 +224,7 @@ def test_create_and_update_virtual_cluster_with_exceptions(
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"4c8g": 1},
revision=0,
)
Expand All @@ -235,7 +235,7 @@ def test_create_and_update_virtual_cluster_with_exceptions(
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"4c8g": 2, "8c16g": 2},
revision=0,
)
Expand All @@ -247,7 +247,7 @@ def test_create_and_update_virtual_cluster_with_exceptions(
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"4c8g": 2, "8c16g": 2},
revision=revision,
)
Expand All @@ -256,32 +256,32 @@ def test_create_and_update_virtual_cluster_with_exceptions(
replica_sets = result["data"].get("replicaSetsToRecommend", {})
# The primary cluster can only fulfill one `8c16g`
# node to meet the virtual cluster's requirement.
assert replica_sets == {"8c16g": 1}
assert replica_sets == {"4c8g": 0, "8c16g": 1}

if allocation_mode == "mixed":
if not divisible:
actor = SmallActor.options(resources={"4c8g": 1}).remote()
ray.get(actor.pid.remote(), timeout=10)

# Update (scale down) the virtual cluster with one node in use.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={},
revision=revision,
)
assert result["result"] is False
assert "No enough nodes to remove from the virtual cluster" in result["msg"]
replica_sets = result["data"].get("replicaSetsToRecommend", {})
# The virtual cluster has one `4c8g` node in use. So we can fulfill none node.
assert replica_sets == {}
assert replica_sets == {"4c8g": 0}

# Create a new virtual cluster that the remaining nodes in the primary cluster
# are not enough.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_2",
allocation_mode=allocation_mode,
divisible=divisible,
replica_sets={"4c8g": 1, "8c16g": 1},
revision=0,
)
Expand All @@ -290,7 +290,7 @@ def test_create_and_update_virtual_cluster_with_exceptions(
replica_sets = result["data"].get("replicaSetsToRecommend", {})
# The primary cluster lacks one `4c8g` node to meet the
# virtual cluster's requirement.
assert replica_sets == {"8c16g": 1}
assert replica_sets == {"4c8g": 0, "8c16g": 1}


@pytest.mark.parametrize(
Expand All @@ -311,11 +311,11 @@ def test_remove_virtual_cluster(disable_aiohttp_cache, ray_start_cluster_head):
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "4c8g"}, resources={"4c8g": 1})
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"}, resources={"8c16g": 1})

# Create a new virtual cluster with exclusive allocation mode.
# Create a new divisible virtual cluster.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode="exclusive",
divisible=True,
replica_sets={"4c8g": 1, "8c16g": 1},
revision=0,
)
Expand All @@ -335,11 +335,11 @@ def test_remove_virtual_cluster(disable_aiohttp_cache, ray_start_cluster_head):
)
assert result["result"] is True

# Create a new virtual cluster with mixed mode.
# Create a new indivisible virtual cluster.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_2",
allocation_mode="mixed",
divisible=False,
replica_sets={"4c8g": 1, "8c16g": 1},
revision=0,
)
Expand Down Expand Up @@ -392,23 +392,21 @@ def test_get_virtual_clusters(disable_aiohttp_cache, ray_start_cluster_head):
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"})
cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"})

# Create a new virtual cluster with mixed allocation mode and
# two `4c8g` nodes.
# Create a new indivisible virtual cluster with two `4c8g` nodes.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_1",
allocation_mode="mixed",
divisible=False,
replica_sets={"4c8g": 2},
revision=0,
)
assert result["result"] is True

# Create a new virtual cluster with exclusive allocation mode
# and two `8c16g` nodes.
# Create a new divisible virtual cluster with two `8c16g` nodes.
result = create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id="virtual_cluster_2",
allocation_mode="exclusive",
divisible=True,
replica_sets={"8c16g": 2},
revision=0,
)
Expand All @@ -423,15 +421,15 @@ def _get_virtual_clusters():
assert result["result"] is True, resp.text
for virtual_cluster in result["data"]["virtualClusters"]:
if virtual_cluster["virtualClusterId"] == "virtual_cluster_1":
assert virtual_cluster["allocationMode"] == "mixed"
assert virtual_cluster["divisible"] == "false"
assert len(virtual_cluster["nodeInstances"]) == 2
for _, node_instance in virtual_cluster["nodeInstances"].items():
assert node_instance["hostname"] == hostname
assert node_instance["templateId"] == "4c8g"
revision_1 = virtual_cluster["revision"]
assert revision_1 > 0
elif virtual_cluster["virtualClusterId"] == "virtual_cluster_2":
assert virtual_cluster["allocationMode"] == "exclusive"
assert virtual_cluster["divisible"] == "true"
assert len(virtual_cluster["nodeInstances"]) == 2
for _, node_instance in virtual_cluster["nodeInstances"].items():
assert node_instance["hostname"] == hostname
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated.gcs_pb2 import AllocationMode
from ray.core.generated.gcs_service_pb2 import (
CreateOrUpdateVirtualClusterRequest,
GetVirtualClustersRequest,
Expand Down Expand Up @@ -44,8 +43,8 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response:
virtual_cluster_data["revision"] = int(
virtual_cluster_data.get("revision", 0)
)
virtual_cluster_data["allocationMode"] = str(
virtual_cluster_data.pop("mode", "mixed")
virtual_cluster_data["divisible"] = str(
virtual_cluster_data.pop("divisible", False)
).lower()

return dashboard_optional_utils.rest_response(
Expand All @@ -69,16 +68,13 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response:

virtual_cluster_info = dict(virtual_cluster_info_json)
virtual_cluster_id = virtual_cluster_info["virtualClusterId"]
allocation_mode = AllocationMode.MIXED
if (
str(virtual_cluster_info.get("allocationMode", "mixed")).lower()
== "exclusive"
):
allocation_mode = AllocationMode.EXCLUSIVE
divisible = False
if str(virtual_cluster_info.get("divisible", False)).lower() == "true":
divisible = True

request = CreateOrUpdateVirtualClusterRequest(
virtual_cluster_id=virtual_cluster_id,
mode=allocation_mode,
divisible=divisible,
replica_sets=virtual_cluster_info.get("replicaSets", {}),
revision=int(virtual_cluster_info.get("revision", 0)),
)
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class MockVirtualClusterInfoAccessor : public VirtualClusterInfoAccessor {
MOCK_METHOD(Status,
AsyncGetAll,
(bool include_job_clusters,
bool only_include_mixed_clusters,
bool only_include_indivisible_clusters,
(const MultiItemCallback<rpc::VirtualClusterTableData> &callback)),
(override));
MOCK_METHOD(Status,
Expand Down
Loading