Skip to content

Commit

Permalink
Refactor and support inherit base resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Michaelvll committed Dec 1, 2023
1 parent a1b0bd3 commit 7cefe10
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 82 deletions.
2 changes: 1 addition & 1 deletion sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _get_optimized_resources(
resources = config.get('resources', None)
resources = sky.Resources.from_yaml_config(resources)
task = sky.Task()
task.set_resources({resources})
task.set_resources(resources)

dag = sky.optimize(dag, quiet=True)
task = dag.tasks[0]
Expand Down
99 changes: 82 additions & 17 deletions sky/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Resources: compute requirements of Tasks."""
import functools
import textwrap
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import colorama
from typing_extensions import Literal
Expand Down Expand Up @@ -75,16 +75,16 @@ def __init__(
.. code-block:: python
# Fully specified cloud and instance type (is_launchable() is True).
sky.Resources(clouds.AWS(), 'p3.2xlarge')
sky.Resources(clouds.GCP(), 'n1-standard-16')
sky.Resources(clouds.GCP(), 'n1-standard-8', 'V100')
Resources(clouds.AWS(), 'p3.2xlarge')
Resources(clouds.GCP(), 'n1-standard-16')
Resources(clouds.GCP(), 'n1-standard-8', 'V100')
# Specifying required resources; the system decides the
# cloud/instance type. The below are equivalent:
sky.Resources(accelerators='V100')
sky.Resources(accelerators='V100:1')
sky.Resources(accelerators={'V100': 1})
sky.Resources(cpus='2+', memory='16+', accelerators='V100')
Resources(accelerators='V100')
Resources(accelerators='V100:1')
Resources(accelerators={'V100': 1})
Resources(cpus='2+', memory='16+', accelerators='V100')
Args:
cloud: the cloud to use.
Expand Down Expand Up @@ -210,23 +210,23 @@ def __repr__(self) -> str:
Examples:
>>> sky.Resources(accelerators='V100')
>>> Resources(accelerators='V100')
<Cloud>({'V100': 1})
>>> sky.Resources(accelerators='V100', use_spot=True)
>>> Resources(accelerators='V100', use_spot=True)
<Cloud>([Spot], {'V100': 1})
>>> sky.Resources(accelerators='V100',
>>> Resources(accelerators='V100',
... use_spot=True, instance_type='p3.2xlarge')
AWS(p3.2xlarge[Spot], {'V100': 1})
>>> sky.Resources(accelerators='V100', instance_type='p3.2xlarge')
>>> Resources(accelerators='V100', instance_type='p3.2xlarge')
AWS(p3.2xlarge, {'V100': 1})
>>> sky.Resources(instance_type='p3.2xlarge')
>>> Resources(instance_type='p3.2xlarge')
AWS(p3.2xlarge, {'V100': 1})
>>> sky.Resources(disk_size=100)
>>> Resources(disk_size=100)
<Cloud>(disk_size=100)
"""
accelerators = ''
Expand Down Expand Up @@ -898,7 +898,7 @@ def get_spot_str(self) -> str:
def make_deploy_variables(
self, cluster_name_on_cloud: str, region: clouds.Region,
zones: Optional[List[clouds.Zone]]) -> Dict[str, Optional[str]]:
"""Converts planned sky.Resources to resource variables.
"""Converts planned Resources to resource variables.
These variables are divided into two categories: cloud-specific and
cloud-agnostic. The cloud-specific variables are generated by the
Expand Down Expand Up @@ -1117,10 +1117,75 @@ def get_required_cloud_features(
return features

@classmethod
def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources':
def from_yaml_config(
cls, config: Optional[Dict[str, Any]]
) -> Union[Set['Resources'], List['Resources']]:
if config is None:
return Resources()
return {Resources()}

def _override_resources(base_resource_config,
override_configs) -> List[Resources]:
resources_list = []
for override_config in override_configs:
new_resource_config = base_resource_config.copy()
new_resource_config.update(override_config)
resources_list.append(
Resources._from_yaml_config_single(new_resource_config))
return resources_list

config = config.copy()
any_of_configs = config.pop('any_of', None)
ordered_configs = config.pop('ordered', None)
if any_of_configs and ordered_configs:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify both "any_of" and "ordered" in resources.')

# Parse resources.accelerators field.
accelerators = config.get('accelerators')
if config and accelerators is not None:
accelerators = config.get('accelerators')
if isinstance(accelerators, str):
accelerators = {accelerators}
elif isinstance(accelerators, dict):
accelerators = [
f'{k}:{v}' if v is not None else f'{k}'
for k, v in accelerators.items()
]
accelerators = set(accelerators)
if len(accelerators) > 1 and (any_of_configs or ordered_configs):
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify multiple "accelerators" with "any_of" '
'or "ordered" in resources.')

if any_of_configs:
resources_list = _override_resources(config, any_of_configs)
return set(resources_list)
if ordered_configs:
resources_list = _override_resources(config, ordered_configs)
return resources_list
# Translate accelerators field to potential multiple resources.
if accelerators:
# In yaml file, we store accelerators as a list.
# In Task, we store a list of resources, each with 1 accelerator.
# This for loop is for format conversion.
tmp_resources_list = []
for acc in accelerators:
tmp_resource = config.copy()
tmp_resource['accelerators'] = acc
tmp_resources_list.append(
Resources._from_yaml_config_single(tmp_resource))

if isinstance(accelerators, (list, set)):
return type(accelerators)(tmp_resources_list)
else:
raise RuntimeError('Accelerators must be a list or a set.')

return {Resources._from_yaml_config_single(config)}

@classmethod
def _from_yaml_config_single(cls, config: Dict[str, str]) -> 'Resources':
common_utils.validate_schema(config, schemas.get_resources_schema(),
'Invalid resources YAML: ')

Expand Down
6 changes: 4 additions & 2 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pickle
import sqlite3
import typing
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set, Union

import colorama

Expand Down Expand Up @@ -178,7 +178,9 @@ def from_replica_statuses(


def add_service(name: str, controller_job_id: int, policy: str,
auto_restart: bool, requested_resources: 'sky.Resources',
auto_restart: bool,
requested_resources: Union[Set['sky.Resources'],
List['sky.Resources']],
status: ServiceStatus) -> bool:
"""Add a service in the database.
Expand Down
62 changes: 2 additions & 60 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,67 +432,9 @@ def from_yaml_config(
task.set_outputs(outputs=outputs,
estimated_size_gigabytes=estimated_size_gigabytes)

# Parse resources field.
resources_config = config.pop('resources', None)
if resources_config and resources_config.get(
'any_of') is not None and resources_config.get(
'ordered') is not None:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify both "any_of" and "ordered" in resources.')
if resources_config and resources_config.get('any_of') is not None:
# TODO(Ziming) In the future we can consider to allow
# additional field when any_of is specified,
# which means we override the fields in all the
# resources under any_of with the fields specified outsied any_of.
if len(resources_config) > 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify "any_of" with other resource fields.')
resources_set = set()
for resource in resources_config['any_of']:
resources_set.add(sky.Resources.from_yaml_config(resource))
task.set_resources(resources_set)
elif resources_config and resources_config.get('ordered') is not None:
if len(resources_config) > 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify "ordered" with other resource fields.')
resources_list = []
for resource in resources_config['ordered']:
resources_list.append(sky.Resources.from_yaml_config(resource))
task.set_resources(resources_list)
# Translate accelerators field to potential multiple resources.
elif resources_config and resources_config.get(
'accelerators') is not None:
accelerators = resources_config.get('accelerators')
if isinstance(accelerators, str):
accelerators = {accelerators}
elif isinstance(accelerators, dict):
accelerators = [
f'{k}:{v}' if v is not None else f'{k}'
for k, v in accelerators.items()
]
accelerators = set(accelerators)

# In yaml file, we store accelerators as a list.
# In Task, we store a list of resources, each with 1 accelerator.
# This for loop is for format conversion.
tmp_resources_list = []
for acc in accelerators:
tmp_resource = resources_config.copy()
tmp_resource['accelerators'] = acc
tmp_resources_list.append(
sky.Resources.from_yaml_config(tmp_resource))

if isinstance(accelerators, list):
task.set_resources(tmp_resources_list)
elif isinstance(accelerators, set):
task.set_resources(set(tmp_resources_list))
else:
raise RuntimeError('Accelerators must be a list or a set.')
else:
task.set_resources(
{sky.Resources.from_yaml_config(resources_config)})
task.set_resources(sky.Resources.from_yaml_config(resources_config))

service = config.pop('service', None)
if service is not None:
Expand Down
11 changes: 9 additions & 2 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,15 @@ def get_controller_resources(
controller_type=controller_type,
err=common_utils.format_exception(e,
use_bracket=True))) from e

return controller_resources
if len(controller_resources) != 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
CONTROLLER_RESOURCES_NOT_VALID_MESSAGE.format(
controller_type=controller_type,
err=f'Expected exactly one resource, got '
f'{len(controller_resources)} resources: '
f'{controller_resources}'))
return list(controller_resources)[0]


def _setup_proxy_command_on_controller(
Expand Down

0 comments on commit 7cefe10

Please sign in to comment.