Skip to content

Commit

Permalink
[Autoscaler][Schema] Cleaned up ClusterResourceConstraint definition (
Browse files Browse the repository at this point in the history
ray-project#49311)

Signed-off-by: Alexey Kudinkin <[email protected]>
  • Loading branch information
alexeykudinkin authored Dec 19, 2024
1 parent c45d40d commit 5483b0e
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 24 deletions.
4 changes: 2 additions & 2 deletions python/ray/autoscaler/v2/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ def log_cluster_scheduling_update(
for infeasible_constraint in infeasible_cluster_resource_constraints:
log_str = "No available node types can fulfill cluster constraint: "
for i, requests_by_count in enumerate(
infeasible_constraint.min_bundles
infeasible_constraint.resource_requests
):
resource_map = ResourceRequestUtil.to_resource_map(
requests_by_count.request
)
log_str += f"{resource_map}*{requests_by_count.count}"
if i < len(infeasible_constraint.min_bundles) - 1:
if i < len(infeasible_constraint.resource_requests) - 1:
log_str += ", "

log_str += (
Expand Down
3 changes: 1 addition & 2 deletions python/ray/autoscaler/v2/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1194,9 +1194,8 @@ def _enforce_resource_constraints(
return []

constraint = constraints[0]
min_bundles = constraint.min_bundles
# Flatten the requests for iterating through.
requests = ResourceRequestUtil.ungroup_by_count(min_bundles)
requests = ResourceRequestUtil.ungroup_by_count(constraint.resource_requests)

# Pass the empty nodes to schedule.
scheduled_nodes, infeasible = ResourceDemandScheduler._try_schedule(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/v2/tests/test_event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_log_scheduling_updates():
],
infeasible_cluster_resource_constraints=[
ClusterResourceConstraint(
min_bundles=ResourceRequestUtil.group_by_count(
resource_requests=ResourceRequestUtil.group_by_count(
cluster_resource_constraints
)
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/v2/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def sched_request(
cluster_resource_constraints=(
[
ClusterResourceConstraint(
min_bundles=ResourceRequestUtil.group_by_count(
resource_requests=ResourceRequestUtil.group_by_count(
cluster_resource_constraints
)
)
Expand Down
10 changes: 5 additions & 5 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ def assert_cluster_resource_constraints(
# We only have 1 constraint for now.
assert len(state.cluster_resource_constraints) == 1

min_bundles = state.cluster_resource_constraints[0].min_bundles
assert len(min_bundles) == len(expected_bundles) == len(expected_count)
resource_requests = state.cluster_resource_constraints[0].resource_requests
assert len(resource_requests) == len(expected_bundles) == len(expected_count)

# Sort all the bundles by bundle's resource names
min_bundles = sorted(
min_bundles,
resource_requests = sorted(
resource_requests,
key=lambda bundle_by_count: "".join(
bundle_by_count.request.resources_bundle.keys()
),
Expand All @@ -76,7 +76,7 @@ def assert_cluster_resource_constraints(
expected, key=lambda bundle_count: "".join(bundle_count[0].keys())
)

for actual_bundle_count, expected_bundle_count in zip(min_bundles, expected):
for actual_bundle_count, expected_bundle_count in zip(resource_requests, expected):
assert (
dict(actual_bundle_count.request.resources_bundle)
== expected_bundle_count[0]
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/v2/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def test_cluster_status_parser_cluster_resource_state():
],
"cluster_resource_constraints": [
{
"min_bundles": [
"resource_requests": [
{
"request": {
"resources_bundle": {"GPU": 2, "CPU": 100},
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def _parse_resource_demands(
ResourceRequestByCount(
bundle=dict(r.request.resources_bundle.items()), count=r.count
)
for r in constraint_request.min_bundles
for r in constraint_request.resource_requests
]
)
constraint_demand.append(demand)
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ Status AutoscalerStateAccessor::RequestClusterResourceConstraint(
auto count = count_array[i];

auto new_resource_requests_by_count =
request.mutable_cluster_resource_constraint()->add_min_bundles();
request.mutable_cluster_resource_constraint()->add_resource_requests();

new_resource_requests_by_count->mutable_request()->mutable_resources_bundle()->insert(
bundle.begin(), bundle.end());
Expand Down
14 changes: 8 additions & 6 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,9 +636,10 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 2}, {"GPU", 1}});
ASSERT_EQ(state.cluster_resource_constraints(0).resource_requests_size(), 1);
CheckResourceRequest(
state.cluster_resource_constraints(0).resource_requests(0).request(),
{{"CPU", 2}, {"GPU", 1}});
}

// Override it
Expand All @@ -647,9 +648,10 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
{{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}});
ASSERT_EQ(state.cluster_resource_constraints(0).resource_requests_size(), 1);
CheckResourceRequest(
state.cluster_resource_constraints(0).resource_requests(0).request(),
{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ struct Mocker {
for (size_t i = 0; i < request_resources.size(); i++) {
auto &resource = request_resources[i];
auto count = count_array[i];
auto bundle = constraint.add_min_bundles();
auto bundle = constraint.add_resource_requests();
bundle->set_count(count);
bundle->mutable_request()->mutable_resources_bundle()->insert(resource.begin(),
resource.end());
Expand Down
12 changes: 9 additions & 3 deletions src/ray/protobuf/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ message GangResourceRequest {
// Cluster resource constraint represents minimal cluster size requirement,
// this is issued through ray.autoscaler.sdk.request_resources.
message ClusterResourceConstraint {
// If not emtpy, the cluster should have the capacity (total resource) to fit
// the min_bundles.
repeated ResourceRequestByCount min_bundles = 1;
// Commands the autoscaler to accommodate the specified requests.
//
// For more context, please check out py-doc for `ray.autoscaler.sdk.request_resources`
// method
//
// NOTE: This call is only a hint to the autoscaler. The actual resulting cluster
// size may be slightly larger or smaller than expected depending on the
// internal bin packing algorithm and max worker count restrictions.
repeated ResourceRequestByCount resource_requests = 1;
}

// Node status for a ray node.
Expand Down

0 comments on commit 5483b0e

Please sign in to comment.