From a2c40a5ed94f847e05502800ad082459c5b2e3d0 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Thu, 12 Dec 2024 17:43:26 +0800 Subject: [PATCH 1/9] Add API --- .../modules/virtual_cluster/__init__.py | 0 .../virtual_cluster/virtual_cluster_head.py | 140 ++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 python/ray/dashboard/modules/virtual_cluster/__init__.py create mode 100644 python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py diff --git a/python/ray/dashboard/modules/virtual_cluster/__init__.py b/python/ray/dashboard/modules/virtual_cluster/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py new file mode 100644 index 000000000000..b919f1a6bf8b --- /dev/null +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -0,0 +1,140 @@ +import asyncio +import logging +from typing import Any, Dict + +import aiohttp.web + +import ray +import ray.dashboard.optional_utils as dashboard_optional_utils +import ray.dashboard.utils as dashboard_utils + +from ray.core.generated import gcs_service_pb2_grpc +from ray.core.generated.gcs_service_pb2 import ( + ReplicaSet, + CreateOrUpdateVirtualClusterRequest, + CreateOrUpdateVirtualClusterReply, + RemoveVirtualClusterRequest, + RemoveVirtualClusterReply, + GetAllVirtualClustersRequest, + GetAllVirtualClustersReply +) +from ray.core.generated.gcs_pb2 import ( + JobExecMode +) + +from ray._private.utils import get_or_create_event_loop +from ray.dashboard.consts import GCS_RPC_TIMEOUT_SECONDS + +logger = logging.getLogger(__name__) +routes = dashboard_optional_utils.DashboardHeadRouteTable + +class VirtualClusterHead(dashboard_utils.DashboardHeadModule): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + + self._gcs_virtual_cluster_info_stub = ( + gcs_service_pb2_grpc.VirtualClusterInfoGcsServiceStub( + dashboard_head.aiogrpc_gcs_channel + ) + ) + + + @routes.get("/virtual_clusters") + @dashboard_optional_utils.aiohttp_cache(10) + async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: + reply = await self._gcs_virtual_cluster_info_stub.GetAllVirtualClusters( + GetAllVirtualClustersRequest()) + + if reply.status.code == 0: + data = dashboard_utils.message_to_dict(reply) + + return dashboard_optional_utils.rest_response( + success=True, + message="All virtual clusters fetched.", + virtual_clusters=data + ) + else: + logger.info("Failed to get all virtual clusters") + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to get all virtual clusters: {}".format( + reply.status.message + ) + ) + + + @routes.post("/virtual_clusters") + async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: + virtual_cluster_info_json = await req.json() + logger.info("POST /virtual_clusters %s", virtual_cluster_info_json) + + virtual_cluster_info = dict(virtual_cluster_info_json) + virtual_cluster_id=virtual_cluster_info["virtualClusterId"] + job_exec_mode=JobExecMode.Mixed + if str(virtual_cluster_info.get("jobExecMode", "mixed")).lower() == "exclusive": + job_exec_mode=JobExecMode.Exclusive + + replica_set_list = [] + for data in virtual_cluster_info["nodeTypeAndCountList"]: + replica_set = ReplicaSet(template_id=data["nodeType"], replicas=data["nodeCount"]) + replica_set_list.append(replica_set) + + request = CreateOrUpdateVirtualClusterRequest( + virtual_cluster_id=virtual_cluster_id, + virtual_cluster_name=virtual_cluster_info.get("name", ""), + mode=job_exec_mode, + replica_set_list=replica_set_list, + revision=int(virtual_cluster_info.get("revision", 0)) + ) + reply = await self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request) + if reply.status.code == 0: + logger.info("Virtual cluster %s created or updated", virtual_cluster_id) + data = dashboard_utils.message_to_dict(reply) + + return dashboard_optional_utils.rest_response( + success=True, + message="Virtual cluster created or updated.", + virtual_cluster_id=virtual_cluster_id, + revision=data.get("revision", 0), + node_instances=data["nodeInstances"] + ) + else: + logger.info("Failed to create or update virtual cluster %s", virtual_cluster_id) + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to create or update virtual cluster {virtual_cluster_id}: {}".format( + reply.status.message + ), + virtual_cluster_id=virtual_cluster_id + ) + + + @routes.delete("/virtual_clusters/{virtual_cluster_id}") + async def remove_virtual_cluster(self, req) -> aiohttp.web.Response: + virtual_cluster_id = req.match_info.get("virtual_cluster_id") + request = RemoveVirtualClusterRequest(virtual_cluster_id=virtual_cluster_id) + reply = await self._gcs_virtual_cluster_info_stub.RemoveVirtualCluster(request) + + if reply.status.code == 0: + logger.info("Virtual cluster %s removed", virtual_cluster_id) + return dashboard_optional_utils.rest_response( + success=True, + message=f"Virtual cluster {virtual_cluster_id} removed.", + virtual_cluster_id=virtual_cluster_id, + ) + else: + logger.info("Failed to remove virtual cluster %s", virtual_cluster_id) + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to remove virtual cluster {virtual_cluster_id}: {}".format( + reply.status.message), + virtual_cluster_id=virtual_cluster_id, + ) + + + async def run(self, server): + pass + + @staticmethod + def is_minimal_module(): + return False \ No newline at end of file From c32fa0d241438642f1f3a37bf89afc90bb3346ab Mon Sep 17 00:00:00 2001 From: Chong Li Date: Thu, 12 Dec 2024 17:50:30 +0800 Subject: [PATCH 2/9] Format --- .../virtual_cluster/virtual_cluster_head.py | 65 +++++++++---------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index b919f1a6bf8b..1d12facdafe2 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -1,33 +1,22 @@ -import asyncio import logging -from typing import Any, Dict import aiohttp.web -import ray import ray.dashboard.optional_utils as dashboard_optional_utils import ray.dashboard.utils as dashboard_utils - from ray.core.generated import gcs_service_pb2_grpc +from ray.core.generated.gcs_pb2 import JobExecMode from ray.core.generated.gcs_service_pb2 import ( - ReplicaSet, CreateOrUpdateVirtualClusterRequest, - CreateOrUpdateVirtualClusterReply, - RemoveVirtualClusterRequest, - RemoveVirtualClusterReply, GetAllVirtualClustersRequest, - GetAllVirtualClustersReply -) -from ray.core.generated.gcs_pb2 import ( - JobExecMode + RemoveVirtualClusterRequest, + ReplicaSet, ) -from ray._private.utils import get_or_create_event_loop -from ray.dashboard.consts import GCS_RPC_TIMEOUT_SECONDS - logger = logging.getLogger(__name__) routes = dashboard_optional_utils.DashboardHeadRouteTable + class VirtualClusterHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) @@ -38,12 +27,12 @@ def __init__(self, dashboard_head): ) ) - @routes.get("/virtual_clusters") @dashboard_optional_utils.aiohttp_cache(10) async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: reply = await self._gcs_virtual_cluster_info_stub.GetAllVirtualClusters( - GetAllVirtualClustersRequest()) + GetAllVirtualClustersRequest() + ) if reply.status.code == 0: data = dashboard_utils.message_to_dict(reply) @@ -51,7 +40,7 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: return dashboard_optional_utils.rest_response( success=True, message="All virtual clusters fetched.", - virtual_clusters=data + virtual_clusters=data, ) else: logger.info("Failed to get all virtual clusters") @@ -59,9 +48,8 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: success=False, message="Failed to get all virtual clusters: {}".format( reply.status.message - ) + ), ) - @routes.post("/virtual_clusters") async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: @@ -69,14 +57,16 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: logger.info("POST /virtual_clusters %s", virtual_cluster_info_json) virtual_cluster_info = dict(virtual_cluster_info_json) - virtual_cluster_id=virtual_cluster_info["virtualClusterId"] - job_exec_mode=JobExecMode.Mixed + virtual_cluster_id = virtual_cluster_info["virtualClusterId"] + job_exec_mode = JobExecMode.Mixed if str(virtual_cluster_info.get("jobExecMode", "mixed")).lower() == "exclusive": - job_exec_mode=JobExecMode.Exclusive + job_exec_mode = JobExecMode.Exclusive replica_set_list = [] for data in virtual_cluster_info["nodeTypeAndCountList"]: - replica_set = ReplicaSet(template_id=data["nodeType"], replicas=data["nodeCount"]) + replica_set = ReplicaSet( + template_id=data["nodeType"], replicas=data["nodeCount"] + ) replica_set_list.append(replica_set) request = CreateOrUpdateVirtualClusterRequest( @@ -84,9 +74,11 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: virtual_cluster_name=virtual_cluster_info.get("name", ""), mode=job_exec_mode, replica_set_list=replica_set_list, - revision=int(virtual_cluster_info.get("revision", 0)) + revision=int(virtual_cluster_info.get("revision", 0)), + ) + reply = await ( + self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request) ) - reply = await self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request) if reply.status.code == 0: logger.info("Virtual cluster %s created or updated", virtual_cluster_id) data = dashboard_utils.message_to_dict(reply) @@ -96,19 +88,20 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: message="Virtual cluster created or updated.", virtual_cluster_id=virtual_cluster_id, revision=data.get("revision", 0), - node_instances=data["nodeInstances"] + node_instances=data["nodeInstances"], ) else: - logger.info("Failed to create or update virtual cluster %s", virtual_cluster_id) + logger.info( + "Failed to create or update virtual cluster %s", virtual_cluster_id + ) return dashboard_optional_utils.rest_response( success=False, - message="Failed to create or update virtual cluster {virtual_cluster_id}: {}".format( - reply.status.message + message="Failed to create or update virtual cluster {}: {}".format( + virtual_cluster_id, reply.status.message ), - virtual_cluster_id=virtual_cluster_id + virtual_cluster_id=virtual_cluster_id, ) - @routes.delete("/virtual_clusters/{virtual_cluster_id}") async def remove_virtual_cluster(self, req) -> aiohttp.web.Response: virtual_cluster_id = req.match_info.get("virtual_cluster_id") @@ -126,15 +119,15 @@ async def remove_virtual_cluster(self, req) -> aiohttp.web.Response: logger.info("Failed to remove virtual cluster %s", virtual_cluster_id) return dashboard_optional_utils.rest_response( success=False, - message="Failed to remove virtual cluster {virtual_cluster_id}: {}".format( - reply.status.message), + message="Failed to remove virtual cluster {}: {}".format( + virtual_cluster_id, reply.status.message + ), virtual_cluster_id=virtual_cluster_id, ) - async def run(self, server): pass @staticmethod def is_minimal_module(): - return False \ No newline at end of file + return False From 37d92042e21dbdaf2f6e7ff3bebf814e959b23ab Mon Sep 17 00:00:00 2001 From: Chong Li Date: Sat, 14 Dec 2024 18:56:08 +0800 Subject: [PATCH 3/9] Fix --- .../modules/virtual_cluster/virtual_cluster_head.py | 13 +++++++------ src/ray/protobuf/gcs.proto | 7 ++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index 1d12facdafe2..5e27a0ae279e 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -40,7 +40,7 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: return dashboard_optional_utils.rest_response( success=True, message="All virtual clusters fetched.", - virtual_clusters=data, + virtual_clusters=data["virtualClusterDataList"], ) else: logger.info("Failed to get all virtual clusters") @@ -58,20 +58,20 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: virtual_cluster_info = dict(virtual_cluster_info_json) virtual_cluster_id = virtual_cluster_info["virtualClusterId"] - job_exec_mode = JobExecMode.Mixed + job_exec_mode = JobExecMode.MIXED if str(virtual_cluster_info.get("jobExecMode", "mixed")).lower() == "exclusive": - job_exec_mode = JobExecMode.Exclusive + job_exec_mode = JobExecMode.EXCLUSIVE replica_set_list = [] - for data in virtual_cluster_info["nodeTypeAndCountList"]: + for data in virtual_cluster_info["replicaSetList"]: replica_set = ReplicaSet( - template_id=data["nodeType"], replicas=data["nodeCount"] + template_id=data["templateId"], replicas=data["replicas"] ) replica_set_list.append(replica_set) request = CreateOrUpdateVirtualClusterRequest( virtual_cluster_id=virtual_cluster_id, - virtual_cluster_name=virtual_cluster_info.get("name", ""), + virtual_cluster_name=virtual_cluster_info.get("virtualClusterName", ""), mode=job_exec_mode, replica_set_list=replica_set_list, revision=int(virtual_cluster_info.get("revision", 0)), @@ -79,6 +79,7 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: reply = await ( self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request) ) + if reply.status.code == 0: logger.info("Virtual cluster %s created or updated", virtual_cluster_id) data = dashboard_utils.message_to_dict(reply) diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 20a8df079da9..6c422d3dab45 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -711,10 +711,11 @@ message JobTableData { /////////////////////////////////////////////////////////////////////////////// enum AllocationMode { + UNKNOWN = 0; // A single node can carray tasks for multiple jobs. - Mixed = 0; + MIXED = 1; // A single node can only carray tasks for one job. - Exclusive = 1; + EXCLUSIVE = 2; } message NodeInstance { @@ -726,7 +727,7 @@ message NodeInstance { message VirtualClusterTableData { // The virtual cluster id. - string id = 1; + string virtual_cluster_id = 1; // The virtual cluster name. string name = 2; // The allocation mode of the virtual cluster. From 86769ae8e5f5ffe6ed1ea76f3d519a3bb9750462 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Mon, 16 Dec 2024 17:36:08 +0800 Subject: [PATCH 4/9] Fix --- .../virtual_cluster/virtual_cluster_head.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index 5e27a0ae279e..c9095494b2f5 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -35,12 +35,19 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: ) if reply.status.code == 0: - data = dashboard_utils.message_to_dict(reply) + data = dashboard_utils.message_to_dict( + reply, always_print_fields_with_no_presence=True + ) + for virtual_cluster_data in data.get("virtualClusterDataList", []): + if "revision" in virtual_cluster_data: + virtual_cluster_data["revision"] = int( + virtual_cluster_data.get("revision") + ) return dashboard_optional_utils.rest_response( success=True, message="All virtual clusters fetched.", - virtual_clusters=data["virtualClusterDataList"], + virtual_clusters=data.get("virtualClusterDataList", []), ) else: logger.info("Failed to get all virtual clusters") @@ -82,14 +89,16 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: if reply.status.code == 0: logger.info("Virtual cluster %s created or updated", virtual_cluster_id) - data = dashboard_utils.message_to_dict(reply) + data = dashboard_utils.message_to_dict( + reply, always_print_fields_with_no_presence=True + ) return dashboard_optional_utils.rest_response( success=True, message="Virtual cluster created or updated.", virtual_cluster_id=virtual_cluster_id, - revision=data.get("revision", 0), - node_instances=data["nodeInstances"], + revision=int(data.get("revision", 0)), + node_instances=data.get("nodeInstances", {}), ) else: logger.info( From cddbad0d20f0c4efca94871087866e5ab153d555 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Mon, 16 Dec 2024 17:38:41 +0800 Subject: [PATCH 5/9] Fix --- .../modules/virtual_cluster/virtual_cluster_head.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index c9095494b2f5..836affc0d79d 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -39,10 +39,9 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: reply, always_print_fields_with_no_presence=True ) for virtual_cluster_data in data.get("virtualClusterDataList", []): - if "revision" in virtual_cluster_data: - virtual_cluster_data["revision"] = int( - virtual_cluster_data.get("revision") - ) + virtual_cluster_data["revision"] = int( + virtual_cluster_data.get("revision", 0) + ) return dashboard_optional_utils.rest_response( success=True, From 32f38d2c00861b294d29d0082bd56be0f8e1b0f8 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Tue, 17 Dec 2024 17:02:17 +0800 Subject: [PATCH 6/9] Rebase and fix --- .../virtual_cluster/virtual_cluster_head.py | 27 +++++++++---------- src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 12 ++++----- src/ray/gcs/gcs_server/gcs_virtual_cluster.h | 4 +-- .../gcs_server/gcs_virtual_cluster_manager.cc | 9 ++++--- .../test/gcs_virtual_cluster_manager_test.cc | 8 +++--- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index 836affc0d79d..1867a71fdb9b 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -5,12 +5,11 @@ import ray.dashboard.optional_utils as dashboard_optional_utils import ray.dashboard.utils as dashboard_utils from ray.core.generated import gcs_service_pb2_grpc -from ray.core.generated.gcs_pb2 import JobExecMode +from ray.core.generated.gcs_pb2 import AllocationMode from ray.core.generated.gcs_service_pb2 import ( CreateOrUpdateVirtualClusterRequest, GetAllVirtualClustersRequest, RemoveVirtualClusterRequest, - ReplicaSet, ) logger = logging.getLogger(__name__) @@ -42,6 +41,9 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: virtual_cluster_data["revision"] = int( virtual_cluster_data.get("revision", 0) ) + virtual_cluster_data["allocationMode"] = str( + virtual_cluster_data.pop("mode", "mixed") + ).lower() return dashboard_optional_utils.rest_response( success=True, @@ -64,22 +66,17 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: virtual_cluster_info = dict(virtual_cluster_info_json) virtual_cluster_id = virtual_cluster_info["virtualClusterId"] - job_exec_mode = JobExecMode.MIXED - if str(virtual_cluster_info.get("jobExecMode", "mixed")).lower() == "exclusive": - job_exec_mode = JobExecMode.EXCLUSIVE - - replica_set_list = [] - for data in virtual_cluster_info["replicaSetList"]: - replica_set = ReplicaSet( - template_id=data["templateId"], replicas=data["replicas"] - ) - replica_set_list.append(replica_set) + allocation_mode = AllocationMode.MIXED + if ( + str(virtual_cluster_info.get("allocationMode", "mixed")).lower() + == "exclusive" + ): + allocation_mode = AllocationMode.EXCLUSIVE request = CreateOrUpdateVirtualClusterRequest( virtual_cluster_id=virtual_cluster_id, - virtual_cluster_name=virtual_cluster_info.get("virtualClusterName", ""), - mode=job_exec_mode, - replica_set_list=replica_set_list, + mode=allocation_mode, + replica_sets=virtual_cluster_info.get("replicaSets", {}), revision=int(virtual_cluster_info.get("revision", 0)), ) reply = await ( diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index cb5b220d6d45..1deca883ae21 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -162,7 +162,7 @@ bool VirtualCluster::MarkNodeInstanceAsDead(const std::string &template_id, std::shared_ptr VirtualCluster::ToProto() const { auto data = std::make_shared(); - data->set_id(GetID()); + data->set_virtual_cluster_id(GetID()); data->set_name(GetName()); data->set_mode(GetMode()); data->set_revision(GetRevision()); @@ -197,7 +197,7 @@ Status JobClusterManager::CreateJobCluster( const std::string &job_name, ReplicaSets replica_sets, CreateOrUpdateVirtualClusterCallback callback) { - if (GetMode() != rpc::AllocationMode::Exclusive) { + if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be created in exclusive mode, virtual_cluster_id: " << GetID() << ", job_name: " << job_name; @@ -248,7 +248,7 @@ Status JobClusterManager::CreateJobCluster( Status JobClusterManager::RemoveJobCluster(const std::string &job_name, RemoveVirtualClusterCallback callback) { - if (GetMode() != rpc::AllocationMode::Exclusive) { + if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be removed in exclusive mode, virtual_cluster_id: " << GetID() << ", job_name: " << job_name; @@ -374,7 +374,7 @@ Status PrimaryCluster::DetermineNodeInstanceAdditionsAndRemovals( bool PrimaryCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - RAY_CHECK(GetMode() == rpc::AllocationMode::Exclusive); + RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE); return job_cluster_id == kEmptyJobClusterId; } @@ -410,7 +410,7 @@ void PrimaryCluster::OnNodeDead(const rpc::GcsNodeInfo &node) { ///////////////////////// LogicalCluster ///////////////////////// bool LogicalCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - if (GetMode() == rpc::AllocationMode::Exclusive) { + if (GetMode() == rpc::AllocationMode::EXCLUSIVE) { return job_cluster_id == kEmptyJobClusterId; } @@ -423,7 +423,7 @@ bool LogicalCluster::IsIdleNodeInstance(const std::string &job_cluster_id, ///////////////////////// JobCluster ///////////////////////// bool JobCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - RAY_CHECK(GetMode() == rpc::AllocationMode::Mixed); + RAY_CHECK(GetMode() == rpc::AllocationMode::MIXED); // TODO(Shanly): The job_cluster_id will always be empty in mixed mode although the node // instance is assigned to one or two jobs, so we need to check the node resources // usage. diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 603097afd948..40ad3c66a91a 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -227,7 +227,7 @@ class PrimaryCluster : public JobClusterManager { : JobClusterManager(async_data_flusher) {} const std::string &GetID() const override { return kPrimaryClusterID; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Exclusive; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } const std::string &GetName() const override { return kPrimaryClusterID; } /// Create or update a new virtual cluster. @@ -309,7 +309,7 @@ class JobCluster : public VirtualCluster { JobCluster(const std::string &id, const std::string &name) : id_(id), name_(name) {} const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Mixed; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::MIXED; } const std::string &GetName() const override { return id_; } bool IsIdleNodeInstance(const std::string &job_cluster_id, diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc index eb917e395140..43aa405a6f18 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc @@ -47,7 +47,8 @@ void GcsVirtualClusterManager::HandleCreateOrUpdateVirtualCluster( data->node_instances().end()); // Fill the revision of the virtual cluster to the reply. reply->set_revision(data->revision()); - RAY_LOG(INFO) << "Succeed in creating or updating virtual cluster " << data->id(); + RAY_LOG(INFO) << "Succeed in creating or updating virtual cluster " + << data->virtual_cluster_id(); } else { RAY_CHECK(data == nullptr); RAY_LOG(WARNING) << "Failed to create or update virtual cluster " @@ -161,7 +162,7 @@ Status GcsVirtualClusterManager::FlushAndPublish( // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); RAY_CHECK_OK(gcs_publisher_.PublishVirtualCluster( - VirtualClusterID::FromBinary(data->id()), *data, nullptr)); + VirtualClusterID::FromBinary(data->virtual_cluster_id()), *data, nullptr)); if (callback) { callback(status, std::move(data)); } @@ -169,12 +170,12 @@ Status GcsVirtualClusterManager::FlushAndPublish( if (data->is_removed()) { return gcs_table_storage_.VirtualClusterTable().Delete( - VirtualClusterID::FromBinary(data->id()), on_done); + VirtualClusterID::FromBinary(data->virtual_cluster_id()), on_done); } // Write the virtual cluster data to the storage. return gcs_table_storage_.VirtualClusterTable().Put( - VirtualClusterID::FromBinary(data->id()), *data, on_done); + VirtualClusterID::FromBinary(data->virtual_cluster_id()), *data, on_done); } } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index 5ad02a48d638..898561cc8e9b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -147,7 +147,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_0"); request.set_virtual_cluster_name("virtual_cluster_id_0"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 5}); request.mutable_replica_sets()->insert({template_id_1, 10}); @@ -200,7 +200,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_1"); request.set_virtual_cluster_name("virtual_cluster_id_1"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, node_count_per_template - 5}); request.mutable_replica_sets()->insert({template_id_1, node_count_per_template - 10}); @@ -249,7 +249,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_2"); request.set_virtual_cluster_name("virtual_cluster_id_2"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 0}); request.mutable_replica_sets()->insert({template_id_1, 0}); @@ -276,7 +276,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_3"); request.set_virtual_cluster_name("virtual_cluster_id_3"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 1}); request.mutable_replica_sets()->insert({template_id_1, 0}); From c83abc430a6c21563abecc0c5d0e2f722ba9cd9b Mon Sep 17 00:00:00 2001 From: Chong Li Date: Tue, 17 Dec 2024 20:01:41 +0800 Subject: [PATCH 7/9] Fix id --- src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 2 +- src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc | 9 ++++----- src/ray/protobuf/gcs.proto | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 47c76d988843..7ebdb9dd569e 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -162,7 +162,7 @@ bool VirtualCluster::MarkNodeInstanceAsDead(const std::string &template_id, std::shared_ptr VirtualCluster::ToProto() const { auto data = std::make_shared(); - data->set_virtual_cluster_id(GetID()); + data->set_id(GetID()); data->set_mode(GetMode()); data->set_revision(GetRevision()); data->mutable_replica_sets()->insert(replica_sets_.begin(), replica_sets_.end()); diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc index 43aa405a6f18..eb917e395140 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc @@ -47,8 +47,7 @@ void GcsVirtualClusterManager::HandleCreateOrUpdateVirtualCluster( data->node_instances().end()); // Fill the revision of the virtual cluster to the reply. reply->set_revision(data->revision()); - RAY_LOG(INFO) << "Succeed in creating or updating virtual cluster " - << data->virtual_cluster_id(); + RAY_LOG(INFO) << "Succeed in creating or updating virtual cluster " << data->id(); } else { RAY_CHECK(data == nullptr); RAY_LOG(WARNING) << "Failed to create or update virtual cluster " @@ -162,7 +161,7 @@ Status GcsVirtualClusterManager::FlushAndPublish( // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); RAY_CHECK_OK(gcs_publisher_.PublishVirtualCluster( - VirtualClusterID::FromBinary(data->virtual_cluster_id()), *data, nullptr)); + VirtualClusterID::FromBinary(data->id()), *data, nullptr)); if (callback) { callback(status, std::move(data)); } @@ -170,12 +169,12 @@ Status GcsVirtualClusterManager::FlushAndPublish( if (data->is_removed()) { return gcs_table_storage_.VirtualClusterTable().Delete( - VirtualClusterID::FromBinary(data->virtual_cluster_id()), on_done); + VirtualClusterID::FromBinary(data->id()), on_done); } // Write the virtual cluster data to the storage. return gcs_table_storage_.VirtualClusterTable().Put( - VirtualClusterID::FromBinary(data->virtual_cluster_id()), *data, on_done); + VirtualClusterID::FromBinary(data->id()), *data, on_done); } } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 038edddf8608..3c7bdf9cbffd 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -727,7 +727,7 @@ message NodeInstance { message VirtualClusterTableData { // The virtual cluster id. - string virtual_cluster_id = 1; + string id = 1; // The allocation mode of the virtual cluster. AllocationMode mode = 2; // The replica set list of the virtual cluster. From e6a0ba4a07b40c814b8623342071dbf457dc08f5 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Wed, 18 Dec 2024 17:56:03 +0800 Subject: [PATCH 8/9] Fix --- src/ray/gcs/gcs_server/gcs_virtual_cluster.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 4ba66c3166c0..525ce7793750 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -197,7 +197,7 @@ class ExclusiveCluster : public VirtualCluster { : VirtualCluster(id), async_data_flusher_(async_data_flusher) {} const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Exclusive; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } /// Create a job cluster. /// @@ -245,7 +245,7 @@ class MixedCluster : public VirtualCluster { MixedCluster &operator=(const MixedCluster &) = delete; const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Mixed; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::MIXED; } /// Check if the virtual cluster is in use. /// From f000e1bc72fe016130f418c36ed4e59f0272b0b2 Mon Sep 17 00:00:00 2001 From: Chong Li Date: Wed, 18 Dec 2024 19:54:47 +0800 Subject: [PATCH 9/9] Fix --- src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 4 ++-- .../gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 074eb8d6d82b..b740dbd768fb 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -298,7 +298,7 @@ bool ExclusiveCluster::InUse() const { return !job_clusters_.empty(); } bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - RAY_CHECK(GetMode() == rpc::AllocationMode::Exclusive); + RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE); return job_cluster_id == kEmptyJobClusterId; } @@ -341,7 +341,7 @@ Status PrimaryCluster::CreateOrUpdateVirtualCluster( if (logical_cluster == nullptr) { // replica_instances_to_remove must be empty as the virtual cluster is a new one. RAY_CHECK(replica_instances_to_remove_from_logical_cluster.empty()); - if (request.mode() == rpc::AllocationMode::Exclusive) { + if (request.mode() == rpc::AllocationMode::EXCLUSIVE) { logical_cluster = std::make_shared(request.virtual_cluster_id(), async_data_flusher_); } else { diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index 34b07c09008e..a9d7a698e729 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -539,7 +539,7 @@ TEST_F(PrimaryClusterTest, RemoveLogicalCluster) { { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id(virtual_cluster_id_0); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 5}); request.mutable_replica_sets()->insert({template_id_1, 10});