From 8ef99356cef770c1d6bde849e9740fc24047b219 Mon Sep 17 00:00:00 2001 From: sule Date: Fri, 17 Jan 2025 11:18:44 +0800 Subject: [PATCH] [VirtualCluster] enrich vcluster list command Signed-off-by: sule --- python/ray/dashboard/state_aggregator.py | 59 +++++++++++++++++-- python/ray/scripts/scripts.py | 2 +- python/ray/util/state/common.py | 8 ++- src/ray/gcs/gcs_server/gcs_virtual_cluster.cc | 6 +- src/ray/gcs/gcs_server/gcs_virtual_cluster.h | 9 ++- src/ray/protobuf/gcs_service.proto | 4 ++ 6 files changed, 78 insertions(+), 10 deletions(-) diff --git a/python/ray/dashboard/state_aggregator.py b/python/ray/dashboard/state_aggregator.py index 1ac3a110cdb3..d4bd4aa56ba0 100644 --- a/python/ray/dashboard/state_aggregator.py +++ b/python/ray/dashboard/state_aggregator.py @@ -478,12 +478,15 @@ def transform(reply) -> ListApiResponse: for entry in result: entry = { "virtual_cluster_id": entry["id"], - "divided_clusters": [], + "divided_clusters": {}, "divisible": entry["divisible"], "replica_sets": {}, "undivided_replica_sets": {}, "visible_node_instances": entry.get("node_instance_views", {}), "undivided_nodes": {}, + "resources_total": {}, + "resources_available": {}, + "resources": {}, } entries[entry["virtual_cluster_id"]] = entry @@ -494,12 +497,15 @@ def transform(reply) -> ListApiResponse: for id, entry in entries.items(): cluster_nodes[id] = set() if id != "kPrimaryClusterID": - primary_cluster["divided_clusters"].append(id) + primary_cluster["divided_clusters"][id] = ( + "divisble" if entry["divisible"] else "indivisible" + ) elif "##" in id: parent_cluster_id = id.split("##")[0] - entries[parent_cluster_id]["divided_clusters"].append(id) + entries[parent_cluster_id]["divided_clusters"][id] = ( + "divisble" if entry["divisible"] else "indivisible" + ) all_nodes.update(entry["visible_node_instances"]) - # update cluster nodes to calculate template ids for id, entry in entries.items(): for sub_cluster_id in entry["divided_clusters"]: @@ -541,6 +547,51 @@ def collect_all_sub_nodes(virtual_cluster_id): + 1 ) entry["undivided_nodes"] = undivided_nodes + full_nodes = divided_nodes.union(undivided_nodes.keys()) + for node_id in full_nodes: + node = all_nodes[node_id] + if not node["is_dead"] and node["template_id"] != "": + for resource, value in node["resources_total"].items(): + entry["resources_total"][resource] = ( + entry["resources_total"].get(resource, 0) + value + ) + for resource, value in node["resources_available"].items(): + entry["resources_available"][resource] = ( + entry["resources_available"].get(resource, 0) + value + ) + + def readable_memory(x: int): + if x >= 2**30: + return str(format(x / (2**30), ".3f")) + " GiB" + elif x >= 2**20: + return str(format(x / (2**20), ".3f")) + " MiB" + elif x >= 2**10: + return str(format(x / (2**10), ".3f")) + " KiB" + return str(format(x, ".3f")) + " B" + + for entry in entries.values(): + for node_id, node in entry["visible_node_instances"].items(): + if "resources_total" in node: + del node["resources_total"] + if "resources_available" in node: + del node["resources_available"] + for node_id, node in entry["undivided_nodes"].items(): + if "resources_total" in node: + del node["resources_total"] + if "resources_available" in node: + del node["resources_available"] + for resource, value in entry["resources_total"].items(): + if "memory" in resource: + entry["resources_total"][resource] = readable_memory(value) + for resource, value in entry["resources_available"].items(): + if "memory" in resource: + entry["resources_available"][resource] = readable_memory(value) + for resource, value in entry["resources_total"].items(): + entry["resources"][ + resource + ] = f"""{entry["resources_available"][resource]} / {value}""" + del entry["resources_available"] + del entry["resources_total"] result = list(entries.values()) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 533ad8150468..025fbfc1a614 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -2911,7 +2911,7 @@ async def run(address): await state_api_data_source_client._client_session.close() fmt = AvailableFormat(format) if detail and fmt == AvailableFormat.DEFAULT: - fmt = AvailableFormat.YAML + fmt = AvailableFormat.TABLE result = [VirtualClusterState(**vcluster) for vcluster in data.result] diff --git a/python/ray/util/state/common.py b/python/ray/util/state/common.py index a60c473ca2d5..8c012e4f8714 100644 --- a/python/ray/util/state/common.py +++ b/python/ray/util/state/common.py @@ -862,11 +862,13 @@ class VirtualClusterState(StateSchema): #: Whether the virtual cluster can split into many child virtual clusters or not. divisible: bool = state_column(filterable=True) #: Divided virtual clusters. - divided_clusters: list = state_column(filterable=False) + divided_clusters: dict = state_column(filterable=False) #: Replica Sets of virtual cluster - replica_sets: Optional[dict] = state_column(filterable=False, detail=True) + replica_sets: dict = state_column(filterable=False) #: Only display undivided replica sets - undivided_replica_sets: Optional[dict] = state_column(filterable=False, detail=True) + undivided_replica_sets: dict = state_column(filterable=False) + #: Total resources of the virtual cluster. + resources: dict = state_column(filterable=False, detail=False) #: Mapping from node id to its instance. visible_node_instances: Optional[dict] = state_column(filterable=False, detail=True) #: Only display undivided nodes diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index d3c510282d73..4a96f17266e0 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -321,7 +321,11 @@ std::shared_ptr VirtualCluster::ToView() const { for (auto &[template_id, job_node_instances] : visible_node_instances_) { for (auto &[job_cluster_id, node_instances] : job_node_instances) { for (auto &[id, node_instance] : node_instances) { - (*data->mutable_node_instance_views())[id] = std::move(*node_instance->ToView()); + auto node_id = scheduling::NodeID( + NodeID::FromHex(node_instance->node_instance_id()).Binary()); + const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id); + (*data->mutable_node_instance_views())[id] = + std::move(*node_instance->ToView(node_resources)); } } } diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 551804e1a55e..6fac202f2b09 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -49,11 +49,18 @@ struct NodeInstance { return node_instance; } - std::shared_ptr ToView() const { + std::shared_ptr ToView( + const NodeResources &node_resources) const { auto node_view = std::make_shared(); node_view->set_template_id(template_id_); node_view->set_hostname(hostname_); node_view->set_is_dead(is_dead_); + auto resources_total = node_view->mutable_resources_total(); + resources_total->insert(node_resources.total.GetResourceMap().begin(), + node_resources.total.GetResourceMap().end()); + auto resources_available = node_view->mutable_resources_available(); + resources_available->insert(node_resources.available.GetResourceMap().begin(), + node_resources.available.GetResourceMap().end()); return node_view; } diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 6ccfc87f55e2..8ed22f405ed8 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -907,6 +907,10 @@ message NodeInstanceView { string template_id = 2; // Whether the node is dead bool is_dead = 3; + // Total resources of the node. + map resources_total = 4; + // Available resources of the node. + map resources_available = 5; } message VirtualClusterView {