Skip to content

Commit

Permalink
[VirtualCluster] enrich vcluster list command
Browse files Browse the repository at this point in the history
  • Loading branch information
xsuler committed Jan 17, 2025
1 parent fde6dcc commit a61f7de
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 10 deletions.
37 changes: 33 additions & 4 deletions python/ray/dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,14 @@ def transform(reply) -> ListApiResponse:
for entry in result:
entry = {
"virtual_cluster_id": entry["id"],
"divided_clusters": [],
"divided_clusters": {},
"divisible": entry["divisible"],
"replica_sets": {},
"undivided_replica_sets": {},
"visible_node_instances": entry.get("node_instance_views", {}),
"undivided_nodes": {},
"resources_total": {},
"resources_available": {},
}
entries[entry["virtual_cluster_id"]] = entry

Expand All @@ -494,12 +496,15 @@ def transform(reply) -> ListApiResponse:
for id, entry in entries.items():
cluster_nodes[id] = set()
if id != "kPrimaryClusterID":
primary_cluster["divided_clusters"].append(id)
primary_cluster["divided_clusters"][id] = (
"divisble" if entry["divisible"] else "indivisible"
)
elif "##" in id:
parent_cluster_id = id.split("##")[0]
entries[parent_cluster_id]["divided_clusters"].append(id)
entries[parent_cluster_id]["divided_clusters"][id] = (
"divisble" if entry["divisible"] else "indivisible"
)
all_nodes.update(entry["visible_node_instances"])

# update cluster nodes to calculate template ids
for id, entry in entries.items():
for sub_cluster_id in entry["divided_clusters"]:
Expand Down Expand Up @@ -541,6 +546,30 @@ def collect_all_sub_nodes(virtual_cluster_id):
+ 1
)
entry["undivided_nodes"] = undivided_nodes
full_nodes = divided_nodes.union(undivided_nodes.keys())
for node_id in full_nodes:
node = all_nodes[node_id]
if not node["is_dead"] and node["template_id"] != "":
for resource, value in node["resources_total"].items():
entry["resources_total"][resource] = (
entry["resources_total"].get(resource, 0) + value
)
for resource, value in node["resources_available"].items():
entry["resources_available"][resource] = (
entry["resources_available"].get(resource, 0) + value
)

for entry in entries.values():
for node_id, node in entry["visible_node_instances"].items():
if "resources_total" in node:
del node["resources_total"]
if "resources_available" in node:
del node["resources_available"]
for node_id, node in entry["undivided_nodes"].items():
if "resources_total" in node:
del node["resources_total"]
if "resources_available" in node:
del node["resources_available"]

result = list(entries.values())

Expand Down
2 changes: 1 addition & 1 deletion python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2911,7 +2911,7 @@ async def run(address):
await state_api_data_source_client._client_session.close()
fmt = AvailableFormat(format)
if detail and fmt == AvailableFormat.DEFAULT:
fmt = AvailableFormat.YAML
fmt = AvailableFormat.TABLE

result = [VirtualClusterState(**vcluster) for vcluster in data.result]

Expand Down
10 changes: 7 additions & 3 deletions python/ray/util/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,11 +862,15 @@ class VirtualClusterState(StateSchema):
#: Whether the virtual cluster can split into many child virtual clusters or not.
divisible: bool = state_column(filterable=True)
#: Divided virtual clusters.
divided_clusters: list = state_column(filterable=False)
divided_clusters: dict = state_column(filterable=False)
#: Replica Sets of virtual cluster
replica_sets: Optional[dict] = state_column(filterable=False, detail=True)
replica_sets: dict = state_column(filterable=False)
#: Only display undivided replica sets
undivided_replica_sets: Optional[dict] = state_column(filterable=False, detail=True)
undivided_replica_sets: dict = state_column(filterable=False)
#: Total resources of the virtual cluster.
resources_total: dict = state_column(filterable=False, detail=False)
#: Available resources of the virtual cluster.
resources_available: dict = state_column(filterable=False, detail=False)
#: Mapping from node id to its instance.
visible_node_instances: Optional[dict] = state_column(filterable=False, detail=True)
#: Only display undivided nodes
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ std::shared_ptr<rpc::VirtualClusterView> VirtualCluster::ToView() const {
for (auto &[template_id, job_node_instances] : visible_node_instances_) {
for (auto &[job_cluster_id, node_instances] : job_node_instances) {
for (auto &[id, node_instance] : node_instances) {
(*data->mutable_node_instance_views())[id] = std::move(*node_instance->ToView());
auto node_id = scheduling::NodeID(
NodeID::FromHex(node_instance->node_instance_id()).Binary());
const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id);
(*data->mutable_node_instance_views())[id] =
std::move(*node_instance->ToView(node_resources));
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/ray/gcs/gcs_server/gcs_virtual_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ struct NodeInstance {
return node_instance;
}

std::shared_ptr<rpc::NodeInstanceView> ToView() const {
std::shared_ptr<rpc::NodeInstanceView> ToView(
const NodeResources &node_resources) const {
auto node_view = std::make_shared<rpc::NodeInstanceView>();
node_view->set_template_id(template_id_);
node_view->set_hostname(hostname_);
node_view->set_is_dead(is_dead_);
auto resources_total = node_view->mutable_resources_total();
resources_total->insert(node_resources.total.GetResourceMap().begin(),
node_resources.total.GetResourceMap().end());
auto resources_available = node_view->mutable_resources_available();
resources_available->insert(node_resources.available.GetResourceMap().begin(),
node_resources.available.GetResourceMap().end());
return node_view;
}

Expand Down
4 changes: 4 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,10 @@ message NodeInstanceView {
string template_id = 2;
// Whether the node is dead
bool is_dead = 3;
// Total resources of the node.
map<string, double> resources_total = 4;
// Available resources of the node.
map<string, double> resources_available = 5;
}

message VirtualClusterView {
Expand Down

0 comments on commit a61f7de

Please sign in to comment.