Skip to content

Commit

Permalink
Add check reserved resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Michaelvll committed Aug 20, 2024
1 parent a068474 commit ab66a8e
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sky.utils import log_utils
from sky.utils import subprocess_utils
from sky.utils import ux_utils
from sky.utils import rich_utils

if typing.TYPE_CHECKING:
import networkx as nx
Expand Down Expand Up @@ -253,6 +254,26 @@ def _estimate_nodes_cost_or_time(
# node -> cloud -> list of resources that satisfy user's requirements.
node_to_candidate_map: _TaskToPerCloudCandidates = {}

def get_available_reservations(
launchable_resources: Dict[resources_lib.Resources,
List[resources_lib.Resources]]
) -> Dict[resources_lib.Resources, int]:
num_available_reserved_nodes_per_resource = {}

def get_reservations_available_resources(
resources: resources_lib.Resources):
num_available_reserved_nodes_per_resource[resources] = sum(
resources.get_reservations_available_resources().values())

launchable_resources_list: List[resources_lib.Resources] = sum(
launchable_resources.values(), [])
with rich_utils.safe_status(
'[cyan]Checking reserved resources...[/]'):
subprocess_utils.run_in_parallel(
get_reservations_available_resources,
launchable_resources_list)
return num_available_reserved_nodes_per_resource

# Compute the estimated cost/time for each node.
for node_i, node in enumerate(topo_order):
if node_i == 0:
Expand Down Expand Up @@ -283,17 +304,8 @@ def _estimate_nodes_cost_or_time(
# Fetch reservations in advance and in parallel to speed up the
# reservation info fetching.
num_resources = len(list(node.resources))
num_available_reserved_nodes_per_resource = {}

def get_reservations_available_resources(
resources: resources_lib.Resources):
num_available_reserved_nodes_per_resource[resources] = sum(
resources.get_reservations_available_resources().values())

launchable_resource_list: List[resources_lib.Resources] = sum(
launchable_resources.values(), [])
subprocess_utils.run_in_parallel(
get_reservations_available_resources, launchable_resource_list)
num_available_reserved_nodes_per_resource = (
get_available_reservations(launchable_resources))

for orig_resources, launchable_list in launchable_resources.items():
if num_resources == 1 and node.time_estimator_func is None:
Expand Down

0 comments on commit ab66a8e

Please sign in to comment.