From b2bda98e19522a47e47953b13c6f6c39b17d4f7a Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Mon, 6 Jan 2025 19:31:16 +0800 Subject: [PATCH] refactor mixed/exclusive cluster to indivisible/divisible cluster (#443) --- .../tests/test_job_with_virtual_cluster.py | 9 +- .../tests/test_virtual_cluster.py | 78 ++++++----- .../virtual_cluster/virtual_cluster_head.py | 16 +-- src/mock/ray/gcs/gcs_client/accessor.h | 2 +- src/ray/gcs/gcs_client/accessor.ant.cc | 6 +- src/ray/gcs/gcs_client/accessor.h | 2 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 3 +- src/ray/gcs/gcs_server/gcs_job_manager.cc | 3 +- src/ray/gcs/gcs_server/gcs_server.cc | 2 +- src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 127 +++++++++--------- src/ray/gcs/gcs_server/gcs_virtual_cluster.h | 45 +++---- .../gcs_server/gcs_virtual_cluster_manager.cc | 37 ++--- .../test/gcs_virtual_cluster_manager_test.cc | 16 +-- src/ray/protobuf/gcs.proto | 12 +- src/ray/protobuf/gcs_service.proto | 8 +- src/ray/raylet/virtual_cluster_manager.cc | 8 +- .../raylet/virtual_cluster_manager_test.cc | 6 +- src/ray/raylet_client/raylet_client.cc | 5 +- 18 files changed, 183 insertions(+), 202 deletions(-) diff --git a/python/ray/dashboard/modules/job/tests/test_job_with_virtual_cluster.py b/python/ray/dashboard/modules/job/tests/test_job_with_virtual_cluster.py index b98d1b5673316..48cec7d846554 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_with_virtual_cluster.py +++ b/python/ray/dashboard/modules/job/tests/test_job_with_virtual_cluster.py @@ -19,7 +19,6 @@ ) from ray.cluster_utils import Cluster, cluster_not_supported from ray.core.generated import gcs_service_pb2_grpc -from ray.core.generated.gcs_pb2 import AllocationMode from ray.core.generated.gcs_service_pb2 import CreateOrUpdateVirtualClusterRequest from ray.dashboard.modules.job.common import ( JOB_ACTOR_NAME_TEMPLATE, @@ -110,7 +109,7 @@ async def job_sdk_client(request, make_sure_dashboard_http_port_unused, external async def create_virtual_cluster( - gcs_address, virtual_cluster_id, replica_sets, allocation_mode=AllocationMode.MIXED + gcs_address, virtual_cluster_id, replica_sets, divisible=False ): channel = GcsChannel(gcs_address, aio=True) channel.connect() @@ -119,7 +118,7 @@ async def create_virtual_cluster( ) request = CreateOrUpdateVirtualClusterRequest( virtual_cluster_id=virtual_cluster_id, - mode=allocation_mode, + divisible=divisible, replica_sets=replica_sets, ) reply = await (gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request)) @@ -356,7 +355,7 @@ def _check_recover( indirect=True, ) @pytest.mark.asyncio -async def test_exclusive_virtual_cluster(job_sdk_client): +async def test_divisible_virtual_cluster(job_sdk_client): head_client, gcs_address, cluster = job_sdk_client virtual_cluster_id_prefix = "VIRTUAL_CLUSTER_" node_to_virtual_cluster = {} @@ -367,7 +366,7 @@ async def test_exclusive_virtual_cluster(job_sdk_client): gcs_address, virtual_cluster_id, {TEMPLATE_ID_PREFIX + str(i): 2}, - AllocationMode.EXCLUSIVE, + True, ) for node_id in nodes: assert node_id not in node_to_virtual_cluster diff --git a/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py b/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py index 36dc42e3fb018..8315af75fe6c6 100644 --- a/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py +++ b/python/ray/dashboard/modules/virtual_cluster/tests/test_virtual_cluster.py @@ -19,14 +19,14 @@ def create_or_update_virtual_cluster( - webui_url, virtual_cluster_id, allocation_mode, replica_sets, revision + webui_url, virtual_cluster_id, divisible, replica_sets, revision ): try: resp = requests.post( webui_url + "/virtual_clusters", json={ "virtualClusterId": virtual_cluster_id, - "allocationMode": allocation_mode, + "divisible": divisible, "replicaSets": replica_sets, "revision": revision, }, @@ -81,14 +81,14 @@ def test_create_and_update_virtual_cluster( revision = 0 def _check_create_or_update_virtual_cluster( - virtual_cluster_id, allocation_mode, replica_sets + virtual_cluster_id, divisible, replica_sets ): nonlocal revision resp = requests.post( webui_url + "/virtual_clusters", json={ "virtualClusterId": virtual_cluster_id, - "allocationMode": allocation_mode, + "divisible": divisible, "replicaSets": replica_sets, "revision": revision, }, @@ -111,59 +111,59 @@ def _check_create_or_update_virtual_cluster( # The virtual cluster has the same node types and count as expected. assert replica_sets == virtual_cluster_replica_sets - # Create a new virtual cluster with exclusive allocation mode. + # Create a new divisible virtual cluster. _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_1", - allocation_mode="exclusive", + divisible=True, replica_sets={"4c8g": 1, "8c16g": 1}, ) # Update the virtual cluster with less nodes (scale down). _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_1", - allocation_mode="exclusive", + divisible=True, replica_sets={"4c8g": 1}, ) # Update the virtual cluster with more nodes (scale up). _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_1", - allocation_mode="exclusive", + divisible=True, replica_sets={"4c8g": 1, "8c16g": 1}, ) # Update the virtual cluster with zero node (make it empty). _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_1", - allocation_mode="exclusive", + divisible=True, replica_sets={}, ) # `virtual_cluster_1` has released all nodes, so we can now - # create a new (mixed) virtual cluster with two nodes. + # create a new indivisible virtual cluster with two nodes. _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_2", - allocation_mode="mixed", + divisible=False, replica_sets={"4c8g": 1, "8c16g": 1}, ) # Update the virtual cluster with less nodes. _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_2", - allocation_mode="mixed", + divisible=False, replica_sets={"4c8g": 1}, ) # Update the virtual cluster with more nodes. _check_create_or_update_virtual_cluster( virtual_cluster_id="virtual_cluster_2", - allocation_mode="mixed", + divisible=False, replica_sets={"4c8g": 1, "8c16g": 1}, ) # Update the virtual cluster with zero node (make it empty). _check_create_or_update_virtual_cluster( - virtual_cluster_id="virtual_cluster_2", allocation_mode="mixed", replica_sets={} + virtual_cluster_id="virtual_cluster_2", divisible=False, replica_sets={} ) @@ -176,9 +176,9 @@ def _check_create_or_update_virtual_cluster( ], indirect=True, ) -@pytest.mark.parametrize("allocation_mode", ["exclusive", "mixed"]) +@pytest.mark.parametrize("divisible", [True, False]) def test_create_and_update_virtual_cluster_with_exceptions( - disable_aiohttp_cache, ray_start_cluster_head, allocation_mode + disable_aiohttp_cache, ray_start_cluster_head, divisible ): cluster: Cluster = ray_start_cluster_head assert wait_until_server_available(cluster.webui_url) is True @@ -193,7 +193,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"16c32g": 1}, revision=0, ) @@ -202,14 +202,14 @@ def test_create_and_update_virtual_cluster_with_exceptions( replica_sets = result["data"].get("replicaSetsToRecommend", {}) # The primary cluster can fulfill none `16c32g` node to meet the # virtual cluster's requirement. - assert replica_sets == {} + assert replica_sets == {"16c32g": 0} # Create a new virtual cluster with node count that the primary cluster # can not provide. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"4c8g": 2, "8c16g": 1}, revision=0, ) @@ -224,7 +224,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"4c8g": 1}, revision=0, ) @@ -235,7 +235,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"4c8g": 2, "8c16g": 2}, revision=0, ) @@ -247,7 +247,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"4c8g": 2, "8c16g": 2}, revision=revision, ) @@ -256,9 +256,9 @@ def test_create_and_update_virtual_cluster_with_exceptions( replica_sets = result["data"].get("replicaSetsToRecommend", {}) # The primary cluster can only fulfill one `8c16g` # node to meet the virtual cluster's requirement. - assert replica_sets == {"8c16g": 1} + assert replica_sets == {"4c8g": 0, "8c16g": 1} - if allocation_mode == "mixed": + if not divisible: actor = SmallActor.options(resources={"4c8g": 1}).remote() ray.get(actor.pid.remote(), timeout=10) @@ -266,7 +266,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={}, revision=revision, ) @@ -274,14 +274,14 @@ def test_create_and_update_virtual_cluster_with_exceptions( assert "No enough nodes to remove from the virtual cluster" in result["msg"] replica_sets = result["data"].get("replicaSetsToRecommend", {}) # The virtual cluster has one `4c8g` node in use. So we can fulfill none node. - assert replica_sets == {} + assert replica_sets == {"4c8g": 0} # Create a new virtual cluster that the remaining nodes in the primary cluster # are not enough. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_2", - allocation_mode=allocation_mode, + divisible=divisible, replica_sets={"4c8g": 1, "8c16g": 1}, revision=0, ) @@ -290,7 +290,7 @@ def test_create_and_update_virtual_cluster_with_exceptions( replica_sets = result["data"].get("replicaSetsToRecommend", {}) # The primary cluster lacks one `4c8g` node to meet the # virtual cluster's requirement. - assert replica_sets == {"8c16g": 1} + assert replica_sets == {"4c8g": 0, "8c16g": 1} @pytest.mark.parametrize( @@ -311,11 +311,11 @@ def test_remove_virtual_cluster(disable_aiohttp_cache, ray_start_cluster_head): cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "4c8g"}, resources={"4c8g": 1}) cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"}, resources={"8c16g": 1}) - # Create a new virtual cluster with exclusive allocation mode. + # Create a new divisible virtual cluster. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode="exclusive", + divisible=True, replica_sets={"4c8g": 1, "8c16g": 1}, revision=0, ) @@ -335,11 +335,11 @@ def test_remove_virtual_cluster(disable_aiohttp_cache, ray_start_cluster_head): ) assert result["result"] is True - # Create a new virtual cluster with mixed mode. + # Create a new indivisible virtual cluster. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_2", - allocation_mode="mixed", + divisible=False, replica_sets={"4c8g": 1, "8c16g": 1}, revision=0, ) @@ -392,23 +392,21 @@ def test_get_virtual_clusters(disable_aiohttp_cache, ray_start_cluster_head): cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"}) cluster.add_node(env_vars={"RAY_NODE_TYPE_NAME": "8c16g"}) - # Create a new virtual cluster with mixed allocation mode and - # two `4c8g` nodes. + # Create a new indivisible virtual cluster with two `4c8g` nodes. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_1", - allocation_mode="mixed", + divisible=False, replica_sets={"4c8g": 2}, revision=0, ) assert result["result"] is True - # Create a new virtual cluster with exclusive allocation mode - # and two `8c16g` nodes. + # Create a new divisible virtual cluster with two `8c16g` nodes. result = create_or_update_virtual_cluster( webui_url=webui_url, virtual_cluster_id="virtual_cluster_2", - allocation_mode="exclusive", + divisible=True, replica_sets={"8c16g": 2}, revision=0, ) @@ -423,7 +421,7 @@ def _get_virtual_clusters(): assert result["result"] is True, resp.text for virtual_cluster in result["data"]["virtualClusters"]: if virtual_cluster["virtualClusterId"] == "virtual_cluster_1": - assert virtual_cluster["allocationMode"] == "mixed" + assert virtual_cluster["divisible"] == "false" assert len(virtual_cluster["nodeInstances"]) == 2 for _, node_instance in virtual_cluster["nodeInstances"].items(): assert node_instance["hostname"] == hostname @@ -431,7 +429,7 @@ def _get_virtual_clusters(): revision_1 = virtual_cluster["revision"] assert revision_1 > 0 elif virtual_cluster["virtualClusterId"] == "virtual_cluster_2": - assert virtual_cluster["allocationMode"] == "exclusive" + assert virtual_cluster["divisible"] == "true" assert len(virtual_cluster["nodeInstances"]) == 2 for _, node_instance in virtual_cluster["nodeInstances"].items(): assert node_instance["hostname"] == hostname diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py index fae5f4ff78e35..061faa5477f2b 100644 --- a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -5,7 +5,6 @@ import ray.dashboard.optional_utils as dashboard_optional_utils import ray.dashboard.utils as dashboard_utils from ray.core.generated import gcs_service_pb2_grpc -from ray.core.generated.gcs_pb2 import AllocationMode from ray.core.generated.gcs_service_pb2 import ( CreateOrUpdateVirtualClusterRequest, GetVirtualClustersRequest, @@ -44,8 +43,8 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: virtual_cluster_data["revision"] = int( virtual_cluster_data.get("revision", 0) ) - virtual_cluster_data["allocationMode"] = str( - virtual_cluster_data.pop("mode", "mixed") + virtual_cluster_data["divisible"] = str( + virtual_cluster_data.pop("divisible", False) ).lower() return dashboard_optional_utils.rest_response( @@ -69,16 +68,13 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: virtual_cluster_info = dict(virtual_cluster_info_json) virtual_cluster_id = virtual_cluster_info["virtualClusterId"] - allocation_mode = AllocationMode.MIXED - if ( - str(virtual_cluster_info.get("allocationMode", "mixed")).lower() - == "exclusive" - ): - allocation_mode = AllocationMode.EXCLUSIVE + divisible = False + if str(virtual_cluster_info.get("divisible", False)).lower() == "true": + divisible = True request = CreateOrUpdateVirtualClusterRequest( virtual_cluster_id=virtual_cluster_id, - mode=allocation_mode, + divisible=divisible, replica_sets=virtual_cluster_info.get("replicaSets", {}), revision=int(virtual_cluster_info.get("revision", 0)), ) diff --git a/src/mock/ray/gcs/gcs_client/accessor.h b/src/mock/ray/gcs/gcs_client/accessor.h index 2888913c8d050..fb868e4e31d90 100644 --- a/src/mock/ray/gcs/gcs_client/accessor.h +++ b/src/mock/ray/gcs/gcs_client/accessor.h @@ -366,7 +366,7 @@ class MockVirtualClusterInfoAccessor : public VirtualClusterInfoAccessor { MOCK_METHOD(Status, AsyncGetAll, (bool include_job_clusters, - bool only_include_mixed_clusters, + bool only_include_indivisible_clusters, (const MultiItemCallback &callback)), (override)); MOCK_METHOD(Status, diff --git a/src/ray/gcs/gcs_client/accessor.ant.cc b/src/ray/gcs/gcs_client/accessor.ant.cc index 90cba4855ca54..4e0b957c6f825 100644 --- a/src/ray/gcs/gcs_client/accessor.ant.cc +++ b/src/ray/gcs/gcs_client/accessor.ant.cc @@ -48,12 +48,12 @@ Status VirtualClusterInfoAccessor::AsyncGet( Status VirtualClusterInfoAccessor::AsyncGetAll( bool include_job_clusters, - bool only_include_mixed_clusters, + bool only_include_indivisible_clusters, const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all virtual cluster info."; rpc::GetVirtualClustersRequest request; request.set_include_job_clusters(true); - request.set_only_include_mixed_clusters(true); + request.set_only_include_indivisible_clusters(true); client_impl_->GetGcsRpcClient().GetVirtualClusters( request, [callback](const Status &status, rpc::GetVirtualClustersReply &&reply) { callback( @@ -84,7 +84,7 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll( }; RAY_CHECK_OK(AsyncGetAll( /*include_job_clusters=*/true, - /*only_include_mixed_clusters=*/true, + /*only_include_indivisible_clusters=*/true, callback)); }; subscribe_operation_ = [this, subscribe](const StatusCallback &done) { diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index 673dd0aa76dd4..074f92c6a6ce3 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -1017,7 +1017,7 @@ class VirtualClusterInfoAccessor { /// \return Status virtual Status AsyncGetAll( bool include_job_clusters, - bool only_include_mixed_clusters, + bool only_include_indivisible_clusters, const MultiItemCallback &callback); /// Subscribe to virtual cluster updates. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 984a0df0b83ee..e8dd4cf761b96 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -718,8 +718,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ if (!virtual_cluster_id.empty()) { auto virtual_cluster = gcs_virtual_cluster_manager_.GetVirtualCluster(virtual_cluster_id); - if ((virtual_cluster == nullptr) || - (virtual_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE)) { + if (virtual_cluster == nullptr || virtual_cluster->Divisible()) { std::stringstream stream; stream << "Invalid virtual cluster, virtual cluster id: " << virtual_cluster_id; return Status::InvalidArgument(stream.str()); diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index b2c3ed7a8d67e..319cf7bb0137f 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -125,8 +125,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request, if (!virtual_cluster_id.empty()) { auto virtual_cluster = gcs_virtual_cluster_manager_.GetVirtualCluster(virtual_cluster_id); - if ((virtual_cluster == nullptr) || - (virtual_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE)) { + if (virtual_cluster == nullptr || virtual_cluster->Divisible()) { std::stringstream stream; stream << "Invalid virtual cluster, virtual cluster id: " << virtual_cluster_id; auto status = Status::InvalidArgument(stream.str()); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index dbc3d05038d7a..9e37135f85fc8 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -400,7 +400,7 @@ void GcsServer::InitClusterResourceScheduler() { if (virtual_cluster == nullptr) { return true; } - RAY_CHECK(virtual_cluster->GetMode() == rpc::AllocationMode::MIXED); + RAY_CHECK(!virtual_cluster->Divisible()); // Check if the node is contained within the specified virtual cluster. return virtual_cluster->ContainsNodeInstance(node_instance_id); }); diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 224dfa39f1a2f..77b83fe4735a6 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -309,7 +309,7 @@ std::shared_ptr VirtualCluster::ReplenishNodeInstance( std::shared_ptr VirtualCluster::ToProto() const { auto data = std::make_shared(); data->set_id(GetID()); - data->set_mode(GetMode()); + data->set_divisible(Divisible()); data->set_revision(GetRevision()); for (auto &[template_id, job_node_instances] : visible_node_instances_) { for (auto &[job_cluster_id, node_instances] : job_node_instances) { @@ -336,8 +336,8 @@ std::string VirtualCluster::DebugString() const { return stream.str(); } -///////////////////////// ExclusiveCluster ///////////////////////// -void ExclusiveCluster::LoadJobCluster(const std::string &job_cluster_id, +///////////////////////// DivisibleCluster ///////////////////////// +void DivisibleCluster::LoadJobCluster(const std::string &job_cluster_id, ReplicaInstances replica_instances) { RAY_CHECK(VirtualClusterID::FromBinary(job_cluster_id).IsJobClusterID()); RAY_CHECK(job_clusters_.find(job_cluster_id) == job_clusters_.end()); @@ -345,11 +345,11 @@ void ExclusiveCluster::LoadJobCluster(const std::string &job_cluster_id, DoCreateJobCluster(job_cluster_id, std::move(replica_instances)); } -Status ExclusiveCluster::CreateJobCluster(const std::string &job_cluster_id, +Status DivisibleCluster::CreateJobCluster(const std::string &job_cluster_id, ReplicaSets replica_sets, CreateOrUpdateVirtualClusterCallback callback, ReplicaSets *replica_sets_to_recommend) { - if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { + if (!Divisible()) { std::ostringstream ostr; ostr << "The job cluster can only be created in exclusive mode, virtual_cluster_id: " << GetID(); @@ -384,7 +384,7 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_cluster_id, return async_data_flusher_(job_cluster->ToProto(), std::move(callback)); } -std::shared_ptr ExclusiveCluster::DoCreateJobCluster( +std::shared_ptr DivisibleCluster::DoCreateJobCluster( const std::string &job_cluster_id, ReplicaInstances replica_instances_to_add) { auto replica_instances_to_remove_from_current_cluster = replica_instances_to_add; auto replica_instances_to_add_to_current_cluster = replica_instances_to_add; @@ -406,9 +406,9 @@ std::shared_ptr ExclusiveCluster::DoCreateJobCluster( return job_cluster; } -Status ExclusiveCluster::RemoveJobCluster(const std::string &job_cluster_id, +Status DivisibleCluster::RemoveJobCluster(const std::string &job_cluster_id, RemoveVirtualClusterCallback callback) { - if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { + if (!Divisible()) { std::ostringstream ostr; ostr << "The job cluster can only be removed in exclusive mode, virtual_cluster_id: " << GetID(); @@ -446,15 +446,15 @@ Status ExclusiveCluster::RemoveJobCluster(const std::string &job_cluster_id, return async_data_flusher_(std::move(data), std::move(callback)); } -std::shared_ptr ExclusiveCluster::GetJobCluster( +std::shared_ptr DivisibleCluster::GetJobCluster( const std::string &job_cluster_id) const { auto iter = job_clusters_.find(job_cluster_id); return iter != job_clusters_.end() ? iter->second : nullptr; } -bool ExclusiveCluster::InUse() const { return !job_clusters_.empty(); } +bool DivisibleCluster::InUse() const { return !job_clusters_.empty(); } -bool ExclusiveCluster::IsIdleNodeInstance(const gcs::NodeInstance &node_instance) const { +bool DivisibleCluster::IsIdleNodeInstance(const gcs::NodeInstance &node_instance) const { auto template_iter = visible_node_instances_.find(node_instance.template_id()); if (template_iter == visible_node_instances_.end()) { return false; @@ -466,7 +466,7 @@ bool ExclusiveCluster::IsIdleNodeInstance(const gcs::NodeInstance &node_instance return job_iter->second.contains(node_instance.node_instance_id()); } -void ExclusiveCluster::ForeachJobCluster( +void DivisibleCluster::ForeachJobCluster( const std::function &)> &fn) const { if (fn == nullptr) { return; @@ -476,7 +476,7 @@ void ExclusiveCluster::ForeachJobCluster( } } -bool ExclusiveCluster::ReplenishNodeInstances( +bool DivisibleCluster::ReplenishNodeInstances( const NodeInstanceReplenishCallback &callback) { RAY_CHECK(callback != nullptr); bool any_node_instance_replenished = false; @@ -534,8 +534,9 @@ bool ExclusiveCluster::ReplenishNodeInstances( return any_node_instance_replenished; } -///////////////////////// MixedCluster ///////////////////////// -bool MixedCluster::IsIdleNodeInstance(const gcs::NodeInstance &node_instance) const { +///////////////////////// IndivisibleCluster ///////////////////////// +bool IndivisibleCluster::IsIdleNodeInstance( + const gcs::NodeInstance &node_instance) const { if (node_instance.is_dead()) { return true; } @@ -550,7 +551,7 @@ bool MixedCluster::IsIdleNodeInstance(const gcs::NodeInstance &node_instance) co return false; } -bool MixedCluster::InUse() const { +bool IndivisibleCluster::InUse() const { for (const auto &[template_id, job_cluster_instances] : visible_node_instances_) { for (const auto &[job_cluster_id, node_instances] : job_cluster_instances) { for (const auto &[node_instance_id, node_instance] : node_instances) { @@ -565,39 +566,39 @@ bool MixedCluster::InUse() const { ///////////////////////// PrimaryCluster ///////////////////////// void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) { - // Let mixed cluster be Vm, exclusive cluster be Ve, job cluster be J, empty job cluster - // be E, then + // Let indivisible cluster be Vi, divisible cluster be Vd, job cluster be J, empty job + // cluster be E, then // - // Nodes(Ve) = Σ(Nodes(J)) + Nodes(E). + // Nodes(Vd) = Σ(Nodes(J)) + Nodes(E). // - // When node belongs to Vm is dead (it's a simple case): - // 1. Find a new replica instance from primary cluster to replace the dead one in Vm, - // then flush and publish Vm. + // When node belongs to Vi is dead (it's a simple case): + // 1. Find a new replica instance from primary cluster to replace the dead one in Vi, + // then flush and publish Vi. // - // When node belongs to J is dead(It will also be dead in Ve as both Ve and J hold the + // When node belongs to J is dead(It will also be dead in Vd as both Vd and J hold the // shared node instance). // 1. Find a new replica instance from E to replace the dead one in J, then flush and // publish J. // 2. If there is no enough node instances in E, then find a new replica instance from // primary cluster to replace the dead one in J, then: // a. flush and publish J. - // b. flush Ve. + // b. flush Vd. // 3. If there is no enough node instances in primary cluster, then wait for the new // node to register. // // When node belongs to E is dead (it's simple case). // 1. Find a new replica instance from primary cluster to replace the dead one in E, - // then flush Ve. + // then flush Vd. // 2. If there is no enough node instances in primary cluster, then wait for the new // node to register. // // When failover happens, we need to load the logical clusters and job clusters from the - // GCS tables, and repair the Ve based on J. - // 1. Find the different node instances between Σ(Nodes(J)) and Nodes(Ve), let the + // GCS tables, and repair the Vd based on J. + // 1. Find the different node instances between Σ(Nodes(J)) and Nodes(Vd), let the // difference be D. - // D = Σ(Nodes(J)) - Ve - // 2. Remove the dead node instances from Ve based on the replica sets of D and then - // flush Ve. + // D = Σ(Nodes(J)) - Vd + // 2. Remove the dead node instances from Vd based on the replica sets of D and then + // flush Vd. const auto &nodes = gcs_init_data.Nodes(); for (const auto &[_, node] : nodes) { if (node.state() == rpc::GcsNodeInfo::ALIVE) { @@ -629,12 +630,12 @@ void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) { } else { // Load the logical cluster. LoadLogicalCluster(virtual_cluster_data.id(), - virtual_cluster_data.mode(), + virtual_cluster_data.divisible(), std::move(replica_instances)); } } - // Repair the exclusive cluster's node instances to ensure that the node instances in + // Repair the divisible cluster's node instances to ensure that the node instances in // the virtual clusters contains all the node instances in the job clusters. Calculate // the different node instances between Σ(Nodes(J)) and Nodes(Ve), // D = Σ(Nodes(J)) - Ve @@ -656,7 +657,7 @@ void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) { replica_sets_to_repair, replica_instances_to_remove, [](const auto &node_instance) { return node_instance.is_dead(); }); - RAY_LOG(INFO) << "Repair the exclusive cluster " << parent_cluster_id + RAY_LOG(INFO) << "Repair the divisible cluster " << parent_cluster_id << " based on the job cluster " << job_cluster_id << "\nreplica_sets_to_repair: " << ray::gcs::DebugString(replica_sets_to_repair) @@ -674,10 +675,9 @@ void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) { if (parent_cluster_id == kPrimaryClusterID) { LoadJobCluster(job_cluster_id.Binary(), std::move(replica_instances)); } else { - auto logical_cluster = std::dynamic_pointer_cast( + auto logical_cluster = std::dynamic_pointer_cast( GetLogicalCluster(parent_cluster_id)); - RAY_CHECK(logical_cluster != nullptr && - logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE); + RAY_CHECK(logical_cluster != nullptr && logical_cluster->Divisible()); logical_cluster->LoadJobCluster(job_cluster_id.Binary(), std::move(replica_instances)); } @@ -697,10 +697,10 @@ void PrimaryCluster::ForeachVirtualCluster( } for (const auto &[_, logical_cluster] : logical_clusters_) { fn(logical_cluster); - if (logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { - auto exclusive_cluster = - std::dynamic_pointer_cast(logical_cluster); - exclusive_cluster->ForeachJobCluster(fn); + if (logical_cluster->Divisible()) { + auto divisible_cluster = + std::dynamic_pointer_cast(logical_cluster); + divisible_cluster->ForeachJobCluster(fn); } } ForeachJobCluster(fn); @@ -708,15 +708,15 @@ void PrimaryCluster::ForeachVirtualCluster( std::shared_ptr PrimaryCluster::LoadLogicalCluster( const std::string &virtual_cluster_id, - rpc::AllocationMode mode, + bool divisible, ReplicaInstances replica_instances) { std::shared_ptr logical_cluster; - if (mode == rpc::AllocationMode::EXCLUSIVE) { - logical_cluster = std::make_shared( + if (divisible) { + logical_cluster = std::make_shared( virtual_cluster_id, async_data_flusher_, cluster_resource_manager_); } else { - logical_cluster = - std::make_shared(virtual_cluster_id, cluster_resource_manager_); + logical_cluster = std::make_shared(virtual_cluster_id, + cluster_resource_manager_); } RAY_CHECK(logical_clusters_.emplace(virtual_cluster_id, logical_cluster).second); @@ -759,12 +759,12 @@ Status PrimaryCluster::CreateOrUpdateVirtualCluster( if (logical_cluster == nullptr) { // replica_instances_to_remove must be empty as the virtual cluster is a new one. RAY_CHECK(replica_instances_to_remove_from_logical_cluster.empty()); - if (request.mode() == rpc::AllocationMode::EXCLUSIVE) { - logical_cluster = std::make_shared( + if (request.divisible()) { + logical_cluster = std::make_shared( request.virtual_cluster_id(), async_data_flusher_, cluster_resource_manager_); } else { - logical_cluster = std::make_shared(request.virtual_cluster_id(), - cluster_resource_manager_); + logical_cluster = std::make_shared(request.virtual_cluster_id(), + cluster_resource_manager_); } logical_clusters_[request.virtual_cluster_id()] = logical_cluster; } @@ -804,7 +804,7 @@ Status PrimaryCluster::DetermineNodeInstanceAdditionsAndRemovals( replica_sets_to_remove, replica_instances_to_remove, [logical_cluster](const auto &node_instance) { - if (logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { + if (logical_cluster->Divisible()) { return true; } if (node_instance.is_dead()) { @@ -886,7 +886,7 @@ Status PrimaryCluster::RemoveVirtualCluster(const std::string &virtual_cluster_i auto message = ostr.str(); return Status::NotFound(message); } - if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) { + if (!virtual_cluster->Divisible()) { std::ostringstream ostr; ostr << "Failed to remove virtual cluster, parent cluster is not exclusive, " "virtual cluster id: " @@ -894,9 +894,9 @@ Status PrimaryCluster::RemoveVirtualCluster(const std::string &virtual_cluster_i auto message = ostr.str(); return Status::InvalidArgument(message); } - ExclusiveCluster *exclusive_cluster = - dynamic_cast(virtual_cluster.get()); - return exclusive_cluster->RemoveJobCluster(virtual_cluster_id, callback); + DivisibleCluster *divisible_cluster = + dynamic_cast(virtual_cluster.get()); + return divisible_cluster->RemoveJobCluster(virtual_cluster_id, callback); } else { return RemoveLogicalCluster(virtual_cluster_id, callback); } @@ -956,10 +956,10 @@ std::shared_ptr PrimaryCluster::GetVirtualCluster( } // Check if it is a job cluster of any logical cluster for (auto &[cluster_id, logical_cluster] : logical_clusters_) { - if (logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { - ExclusiveCluster *exclusive_cluster = - dynamic_cast(logical_cluster.get()); - auto job_cluster = exclusive_cluster->GetJobCluster(virtual_cluster_id); + if (logical_cluster->Divisible()) { + DivisibleCluster *divisible_cluster = + dynamic_cast(logical_cluster.get()); + auto job_cluster = divisible_cluster->GetJobCluster(virtual_cluster_id); if (job_cluster != nullptr) { return job_cluster; } @@ -973,16 +973,15 @@ void PrimaryCluster::ForeachVirtualClustersData( std::vector> virtual_cluster_data_list; auto virtual_cluster_id = request.virtual_cluster_id(); bool include_job_clusters = request.include_job_clusters(); - bool only_include_mixed_cluster = request.only_include_mixed_clusters(); + bool only_include_indivisible_cluster = request.only_include_indivisible_clusters(); auto visit_proto_data = [&](const VirtualCluster *cluster) { - if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { - auto exclusive_cluster = dynamic_cast(cluster); - exclusive_cluster->ForeachJobCluster( + if (include_job_clusters && cluster->Divisible()) { + auto divisible_cluster = dynamic_cast(cluster); + divisible_cluster->ForeachJobCluster( [&](const auto &job_cluster) { callback(job_cluster->ToProto()); }); } - if (only_include_mixed_cluster && - cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { + if (only_include_indivisible_cluster && cluster->Divisible()) { return; } if (cluster->GetID() != kPrimaryClusterID) { diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index c429ca9f9e399..9efc4b207bf7a 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -139,14 +139,9 @@ class VirtualCluster { /// Get the id of the cluster. virtual const std::string &GetID() const = 0; - /// Get the allocation mode of the cluster. - /// There are two modes of the cluster: - /// - Exclusive mode means that a signle node in the cluster can execute one or - /// more tasks belongs to only one job. - /// - Mixed mode means that a single node in the cluster can execute tasks - /// belongs to multiple jobs. - /// \return The allocation mode of the cluster. - virtual rpc::AllocationMode GetMode() const = 0; + /// Whether the virtual cluster can splite into one or more child virtual clusters or + /// not. + virtual bool Divisible() const = 0; /// Get the revision number of the cluster. uint64_t GetRevision() const { return revision_; } @@ -255,18 +250,18 @@ class VirtualCluster { }; class JobCluster; -class ExclusiveCluster : public VirtualCluster { +class DivisibleCluster : public VirtualCluster { public: - ExclusiveCluster(const std::string &id, + DivisibleCluster(const std::string &id, const AsyncClusterDataFlusher &async_data_flusher, const ClusterResourceManager &cluster_resource_manager) : VirtualCluster(id, cluster_resource_manager), async_data_flusher_(async_data_flusher) {} const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } + bool Divisible() const override { return true; } - /// Load a job cluster to the exclusive cluster. + /// Load a job cluster to the divisible cluster. /// /// \param job_cluster_id The id of the job cluster. /// \param replica_instances The replica instances of the job cluster. @@ -330,7 +325,7 @@ class ExclusiveCluster : public VirtualCluster { bool ReplenishNodeInstances(const NodeInstanceReplenishCallback &callback) override; protected: - /// Create a job cluster to the exclusive cluster. + /// Do create a job cluster from the divisible cluster. /// /// \param job_cluster_id The id of the job cluster. /// \param replica_instances_to_add The node instances to be added. @@ -344,15 +339,15 @@ class ExclusiveCluster : public VirtualCluster { AsyncClusterDataFlusher async_data_flusher_; }; -class MixedCluster : public VirtualCluster { +class IndivisibleCluster : public VirtualCluster { public: - MixedCluster(const std::string &id, - const ClusterResourceManager &cluster_resource_manager) + IndivisibleCluster(const std::string &id, + const ClusterResourceManager &cluster_resource_manager) : VirtualCluster(id, cluster_resource_manager) {} - MixedCluster &operator=(const MixedCluster &) = delete; + IndivisibleCluster &operator=(const IndivisibleCluster &) = delete; const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::MIXED; } + bool Divisible() const override { return false; } /// Check if the virtual cluster is in use. /// @@ -367,17 +362,17 @@ class MixedCluster : public VirtualCluster { bool IsIdleNodeInstance(const gcs::NodeInstance &node_instance) const override; }; -class JobCluster : public MixedCluster { +class JobCluster : public IndivisibleCluster { public: - using MixedCluster::MixedCluster; + using IndivisibleCluster::IndivisibleCluster; }; -class PrimaryCluster : public ExclusiveCluster, +class PrimaryCluster : public DivisibleCluster, public std::enable_shared_from_this { public: PrimaryCluster(const AsyncClusterDataFlusher &async_data_flusher, const ClusterResourceManager &cluster_resource_manager) - : ExclusiveCluster( + : DivisibleCluster( kPrimaryClusterID, async_data_flusher, cluster_resource_manager) {} PrimaryCluster &operator=(const PrimaryCluster &) = delete; @@ -390,16 +385,16 @@ class PrimaryCluster : public ExclusiveCluster, /// Load a logical cluster to the primary cluster. /// /// \param virtual_cluster_id The id of the logical cluster. - /// \param mode The allocation mode of the logical cluster. + /// \param divisible Whether the logical cluster is divisible or not. /// \param replica_instances The replica instances of the logical cluster. /// \return The loaded logical cluster. std::shared_ptr LoadLogicalCluster( const std::string &virtual_cluster_id, - rpc::AllocationMode mode, + bool divisible, ReplicaInstances replica_instances); const std::string &GetID() const override { return kPrimaryClusterID; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } + bool Divisible() const override { return true; } /// Create or update a new virtual cluster. /// diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc index 3f81460e949db..f07e0fa62ce83 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc @@ -40,7 +40,7 @@ void GcsVirtualClusterManager::OnNodeDead(const rpc::GcsNodeInfo &node) { } void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) { - // exit early when job has no virtual cluster id + // exit early when job without a virtual cluster id. const auto &virtual_cluster_id = job_data.virtual_cluster_id(); if (virtual_cluster_id.empty()) { return; @@ -49,28 +49,28 @@ void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) auto job_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_id); if (!job_cluster_id.IsJobClusterID()) { - // exit early when this job is submitted in a mixed cluster + // exit early when this job does not belong to an job cluster. return; } - std::string exclusive_cluster_id = job_cluster_id.ParentID().Binary(); + std::string divisible_cluster_id = job_cluster_id.ParentID().Binary(); - auto virtual_cluster = GetVirtualCluster(exclusive_cluster_id); + auto virtual_cluster = GetVirtualCluster(divisible_cluster_id); if (virtual_cluster == nullptr) { RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary() << " when handling job finished event, parent cluster not exists."; return; } - if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) { + if (!virtual_cluster->Divisible()) { // this should not happen, virtual cluster should be exclusive return; } - ExclusiveCluster *exclusive_cluster = - dynamic_cast(virtual_cluster.get()); + DivisibleCluster *divisible_cluster = + dynamic_cast(virtual_cluster.get()); - auto status = exclusive_cluster->RemoveJobCluster( + auto status = divisible_cluster->RemoveJobCluster( virtual_cluster_id, [this, job_cluster_id](const Status &status, std::shared_ptr data, @@ -216,7 +216,7 @@ void GcsVirtualClusterManager::HandleCreateJobCluster( on_done(Status::NotFound(message), nullptr, nullptr); return; } - if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) { + if (!virtual_cluster->Divisible()) { std::ostringstream ostr; ostr << " virtual cluster is not exclusive: " << virtual_cluster_id; std::string message = ostr.str(); @@ -225,11 +225,11 @@ void GcsVirtualClusterManager::HandleCreateJobCluster( } ReplicaSets replica_sets(request.replica_sets().begin(), request.replica_sets().end()); - auto exclusive_cluster = dynamic_cast(virtual_cluster.get()); - std::string job_cluster_id = exclusive_cluster->BuildJobClusterID(request.job_id()); + auto divisible_cluster = dynamic_cast(virtual_cluster.get()); + std::string job_cluster_id = divisible_cluster->BuildJobClusterID(request.job_id()); ReplicaSets replica_sets_to_recommend; - auto status = exclusive_cluster->CreateJobCluster( + auto status = divisible_cluster->CreateJobCluster( job_cluster_id, std::move(replica_sets), on_done, &replica_sets_to_recommend); if (!status.ok()) { on_done(status, nullptr, &replica_sets_to_recommend); @@ -290,11 +290,12 @@ Status GcsVirtualClusterManager::VerifyRequest( } // check if the request attributes are compatible with the virtual cluster. - if (request.mode() != logical_cluster->GetMode()) { + if (request.divisible() != logical_cluster->Divisible()) { std::ostringstream ostr; ostr << "The requested attributes are incompatible with virtual cluster " - << request.virtual_cluster_id() << ". expect: (" << logical_cluster->GetMode() - << "), actual: (" << request.mode() << ")."; + << request.virtual_cluster_id() << ". expect: (" + << logical_cluster->Divisible() << "), actual: (" << request.divisible() + << ")."; std::string message = ostr.str(); RAY_LOG(ERROR) << message; return Status::InvalidArgument(message); @@ -331,9 +332,9 @@ Status GcsVirtualClusterManager::FlushAndPublish( auto on_done = [this, data, callback = std::move(callback)](const Status &status) { // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); - if (data->mode() != rpc::AllocationMode::MIXED) { - // Tasks can only be scheduled on the nodes in the mixed cluster, so we just need to - // publish the mixed cluster data. + if (data->divisible()) { + // Tasks can only be scheduled on the nodes in the indivisible cluster, so we just + // need to publish the indivisible cluster data. if (callback) { callback(status, std::move(data), nullptr); } diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index 290336fd23cbe..56a975947d0ed 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -200,10 +200,10 @@ class VirtualClusterTest : public ::testing::Test { std::shared_ptr primary_cluster, const std::string &virtual_cluster_id, const absl::flat_hash_map &replica_sets, - rpc::AllocationMode allocation_mode = rpc::AllocationMode::EXCLUSIVE) { + bool divisible = true) { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id(virtual_cluster_id); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); + request.set_divisible(true); request.set_revision(0); request.mutable_replica_sets()->insert(replica_sets.begin(), replica_sets.end()); auto status = primary_cluster->CreateOrUpdateVirtualCluster( @@ -755,7 +755,7 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { }) .ok()); - auto virtual_cluster_0 = std::dynamic_pointer_cast( + auto virtual_cluster_0 = std::dynamic_pointer_cast( primary_cluster->GetLogicalCluster(virtual_cluster_id_0)); ASSERT_TRUE(virtual_cluster_0 != nullptr); @@ -798,7 +798,7 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) { virtual_clusters_data_map.clear(); request.set_include_job_clusters(true); - request.set_only_include_mixed_clusters(true); + request.set_only_include_indivisible_clusters(true); primary_cluster->ForeachVirtualClustersData( request, [this, &virtual_clusters_data_map](auto data) { virtual_clusters_data_map.emplace(data->id(), data); @@ -840,7 +840,7 @@ class FailoverTest : public PrimaryClusterTest { RAY_CHECK_OK(CreateVirtualCluster(primary_cluster, virtual_cluster_id_2_, {{template_id_0_, 5}, {template_id_1_, 5}}, - rpc::AllocationMode::MIXED)); + /*divisible=*/false)); job_cluster_id_0_ = primary_cluster->BuildJobClusterID("job_0"); RAY_CHECK_OK(primary_cluster->CreateJobCluster( @@ -850,7 +850,7 @@ class FailoverTest : public PrimaryClusterTest { ASSERT_TRUE(status.ok()); })); - auto virtual_cluster_1 = std::dynamic_pointer_cast( + auto virtual_cluster_1 = std::dynamic_pointer_cast( primary_cluster->GetLogicalCluster(virtual_cluster_id_1_)); RAY_CHECK(virtual_cluster_1 != nullptr); @@ -990,7 +990,7 @@ TEST_F(FailoverTest, FailoverWithDeadNodes) { } { - auto virtual_cluster_1 = std::dynamic_pointer_cast( + auto virtual_cluster_1 = std::dynamic_pointer_cast( primary_cluster->GetVirtualCluster(virtual_cluster_id_1_)); ASSERT_TRUE(virtual_cluster_1 != nullptr); @@ -1063,7 +1063,7 @@ TEST_F(FailoverTest, OnlyFlushJobClusters) { }; { - auto virtual_cluster_1 = std::dynamic_pointer_cast( + auto virtual_cluster_1 = std::dynamic_pointer_cast( primary_cluster->GetVirtualCluster(virtual_cluster_id_1_)); ASSERT_TRUE(virtual_cluster_1 != nullptr); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index c2794c7360d0e..6b98e18e56921 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -714,14 +714,6 @@ message JobTableData { } /////////////////////////////////////////////////////////////////////////////// -enum AllocationMode { - UNKNOWN = 0; - // A single node can carray tasks for multiple jobs. - MIXED = 1; - // A single node can only carray tasks for one job. - EXCLUSIVE = 2; -} - message NodeInstance { // The Hostname address of the node instance. string hostname = 1; @@ -732,8 +724,8 @@ message NodeInstance { message VirtualClusterTableData { // The virtual cluster id. string id = 1; - // The allocation mode of the virtual cluster. - AllocationMode mode = 2; + // Whether the virtual cluster can splite into many child virtual clusters or not. + bool divisible = 2; // Mapping from node id to it's instance. map node_instances = 3; // Whether this virtual cluster is removed. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 2ae32b69809af..179d5a7f7b162 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -866,8 +866,8 @@ service TaskInfoGcsService { message CreateOrUpdateVirtualClusterRequest { // The virtual cluster id. string virtual_cluster_id = 1; - // The allocation mode of the virtual cluster. - AllocationMode mode = 2; + // Whether the virtual cluster can splite into many child virtual clusters or not. + bool divisible = 2; // The replica set of the virtual cluster. map replica_sets = 3; // Version number of the last modification to the virtual cluster. @@ -901,8 +901,8 @@ message GetVirtualClustersRequest { string virtual_cluster_id = 1; // Wether include job clusters. bool include_job_clusters = 2; - // It will reply mixed clusters if only_include_mixed_clusters is true. - bool only_include_mixed_clusters = 3; + // It will reply indivisible clusters if only_include_indivisible_clusters is true. + bool only_include_indivisible_clusters = 3; } message GetVirtualClustersReply { diff --git a/src/ray/raylet/virtual_cluster_manager.cc b/src/ray/raylet/virtual_cluster_manager.cc index 2318ccadb42fe..ef10bc9ae4d34 100644 --- a/src/ray/raylet/virtual_cluster_manager.cc +++ b/src/ray/raylet/virtual_cluster_manager.cc @@ -22,8 +22,10 @@ namespace raylet { bool VirtualClusterManager::UpdateVirtualCluster( rpc::VirtualClusterTableData virtual_cluster_data) { RAY_LOG(INFO) << "Virtual cluster updated: " << virtual_cluster_data.id(); - if (virtual_cluster_data.mode() != rpc::AllocationMode::MIXED) { - RAY_LOG(WARNING) << "The virtual cluster mode is not MIXED, ignore it."; + if (virtual_cluster_data.divisible()) { + RAY_LOG(WARNING) << "Virtual cluster " << virtual_cluster_data.id() + << " is divisible, " + << "ignore it."; return false; } @@ -60,7 +62,7 @@ bool VirtualClusterManager::ContainsNodeInstance(const std::string &virtual_clus return false; } const auto &virtual_cluster_data = it->second; - RAY_CHECK(virtual_cluster_data.mode() == rpc::AllocationMode::MIXED); + RAY_CHECK(!virtual_cluster_data.divisible()); const auto &node_instances = virtual_cluster_data.node_instances(); return node_instances.find(node_id.Hex()) != node_instances.end(); diff --git a/src/ray/raylet/virtual_cluster_manager_test.cc b/src/ray/raylet/virtual_cluster_manager_test.cc index 526cc040ca09f..575ab79c6a894 100644 --- a/src/ray/raylet/virtual_cluster_manager_test.cc +++ b/src/ray/raylet/virtual_cluster_manager_test.cc @@ -31,7 +31,7 @@ TEST_F(VirtualClusterManagerTest, UpdateVirtualCluster) { rpc::VirtualClusterTableData virtual_cluster_data; virtual_cluster_data.set_id(virtual_cluster_id_0); - virtual_cluster_data.set_mode(rpc::AllocationMode::EXCLUSIVE); + virtual_cluster_data.set_divisible(true); virtual_cluster_data.set_revision(100); for (size_t i = 0; i < 100; ++i) { auto node_id = NodeID::FromRandom(); @@ -41,7 +41,7 @@ TEST_F(VirtualClusterManagerTest, UpdateVirtualCluster) { ASSERT_FALSE(virtual_cluster_manager.UpdateVirtualCluster(virtual_cluster_data)); ASSERT_FALSE(virtual_cluster_manager.ContainsVirtualCluster(virtual_cluster_id_0)); - virtual_cluster_data.set_mode(rpc::AllocationMode::MIXED); + virtual_cluster_data.set_divisible(false); ASSERT_TRUE(virtual_cluster_manager.UpdateVirtualCluster(virtual_cluster_data)); ASSERT_TRUE(virtual_cluster_manager.ContainsVirtualCluster(virtual_cluster_id_0)); @@ -62,7 +62,7 @@ TEST_F(VirtualClusterManagerTest, TestContainsNodeInstance) { rpc::VirtualClusterTableData virtual_cluster_data; virtual_cluster_data.set_id(virtual_cluster_id_0); - virtual_cluster_data.set_mode(rpc::AllocationMode::MIXED); + virtual_cluster_data.set_divisible(false); virtual_cluster_data.set_revision(100); absl::flat_hash_set node_ids; for (size_t i = 0; i < 100; ++i) { diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index adf86f3b9d58a..02972a482fbb9 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -111,8 +111,9 @@ Status RayletClient::AnnounceWorkerPortForWorker(int port) { return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb); } -Status RayletClient::AnnounceWorkerPortForDriver( - int port, const std::string &entrypoint, const std::string &virtual_cluster_id) { +Status RayletClient::AnnounceWorkerPortForDriver(int port, + const std::string &entrypoint, + const std::string &virtual_cluster_id) { flatbuffers::FlatBufferBuilder fbb; auto message = protocol::CreateAnnounceWorkerPort( fbb, port, fbb.CreateString(entrypoint), fbb.CreateString(virtual_cluster_id));