Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Support both managed on-demand and spot instances #2545

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/reference/yaml-spec.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Available fields:
# If unspecified, defaults to False (on-demand instances).
use_spot: False

# Launch a managed on-demand job.
managed: 'ON_DEMAND'

# The recovery strategy for spot jobs (optional).
# `use_spot` must be True for this to have any effect. For now, only
# `FAILOVER` strategy is supported.
Expand Down
9 changes: 9 additions & 0 deletions examples/spot_demand_scheduling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: spot-demand-scheduling

resources:
cloud: gcp
managed: ON_DEMAND

run: |
echo "Hello World!"
sleep 600
15 changes: 15 additions & 0 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides(
instance_type: Optional[str] = None,
num_nodes: Optional[int] = None,
use_spot: Optional[bool] = None,
use_managed_demand: Optional[bool] = None,
image_id: Optional[str] = None,
disk_size: Optional[int] = None,
disk_tier: Optional[str] = None,
Expand Down Expand Up @@ -1101,6 +1102,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides(
if spot_recovery is not None:
override_params['spot_recovery'] = spot_recovery

if use_managed_demand is not None:
override_params['use_managed_demand'] = use_managed_demand

assert len(task.resources) == 1
old_resources = list(task.resources)[0]
new_resources = old_resources.copy(**override_params)
Expand Down Expand Up @@ -3480,6 +3484,10 @@ def spot():
default=None,
type=str,
help='Spot recovery strategy to use for the managed spot task.')
@click.option('--use-managed-demand',
default=None,
type=str,
help='Use managed demand instances.')
@click.option('--disk-size',
default=None,
type=int,
Expand Down Expand Up @@ -3533,6 +3541,7 @@ def spot_launch(
instance_type: Optional[str],
num_nodes: Optional[int],
use_spot: Optional[bool],
use_managed_demand: Optional[bool],
image_id: Optional[str],
spot_recovery: Optional[str],
env_file: Optional[Dict[str, str]],
Expand Down Expand Up @@ -3571,6 +3580,7 @@ def spot_launch(
instance_type=instance_type,
num_nodes=num_nodes,
use_spot=use_spot,
use_managed_demand=use_managed_demand,
image_id=image_id,
env=env,
disk_size=disk_size,
Expand Down Expand Up @@ -3598,6 +3608,11 @@ def spot_launch(
else:
dag = task_or_dag

if spot_recovery and not use_spot and not use_managed_demand:
click.secho(
'Both use_spot and use_managed_demand are False or not specified')
sys.exit(1)

if name is not None:
dag.name = name

Expand Down
2 changes: 1 addition & 1 deletion sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ def _fill_in_launchable_resources(
List[resources_lib.Resources])
if blocked_resources is None:
blocked_resources = []
for resources in task.get_resources():
for resources in task.get_resources_both_spot_and_demand():
if resources.cloud is not None and not _cloud_in_list(
resources.cloud, enabled_clouds):
if try_fix_with_sky_check:
Expand Down
28 changes: 26 additions & 2 deletions sky/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
accelerators: Union[None, str, Dict[str, int]] = None,
accelerator_args: Optional[Dict[str, str]] = None,
use_spot: Optional[bool] = None,
use_managed_demand: Optional[bool] = None,
spot_recovery: Optional[str] = None,
region: Optional[str] = None,
zone: Optional[str] = None,
Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(

self._use_spot_specified = use_spot is not None
self._use_spot = use_spot if use_spot is not None else False
self._use_managed_demand = use_managed_demand
self._spot_recovery = None
if spot_recovery is not None:
if spot_recovery.strip().lower() != 'none':
Expand Down Expand Up @@ -344,6 +346,10 @@ def accelerator_args(self) -> Optional[Dict[str, str]]:
def use_spot(self) -> bool:
return self._use_spot

@property
def use_managed_demand(self) -> bool:
return self._use_managed_demand

@property
def use_spot_specified(self) -> bool:
return self._use_spot_specified
Expand Down Expand Up @@ -659,7 +665,7 @@ def _try_validate_cpus_mem(self) -> None:
def _try_validate_spot(self) -> None:
if self._spot_recovery is None:
return
if not self._use_spot:
if not self._use_spot and not self._use_managed_demand:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify spot_recovery without use_spot set to True.'
Expand Down Expand Up @@ -1029,6 +1035,8 @@ def copy(self, **override) -> 'Resources':
self.accelerator_args),
use_spot=override.pop('use_spot', use_spot),
spot_recovery=override.pop('spot_recovery', self.spot_recovery),
use_managed_demand=override.pop('use_managed_demand',
self.use_managed_demand),
disk_size=override.pop('disk_size', self.disk_size),
region=override.pop('region', self.region),
zone=override.pop('zone', self.zone),
Expand Down Expand Up @@ -1107,7 +1115,22 @@ def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources':
resources_fields['accelerator_args'])
if resources_fields['disk_size'] is not None:
resources_fields['disk_size'] = int(resources_fields['disk_size'])

if config.get('managed') is not None:
managed_str = config.pop('managed')
if managed_str == 'SPOT':
resources_fields['use_spot'] = True
resources_fields['use_managed_demand'] = False
elif managed_str == 'ON_DEMAND':
resources_fields['use_spot'] = False
resources_fields['use_managed_demand'] = True
elif managed_str == 'CONSIDER_BOTH':
resources_fields['use_spot'] = True
resources_fields['use_managed_demand'] = True
else:
raise ValueError(
f'Invalid value {managed_str} for managed. '
'Please use one of "SPOT", "ON_DEMAND", or "CONSIDER_BOTH".'
)
assert not config, f'Invalid resource args: {config.keys()}'
return Resources(**resources_fields)

Expand All @@ -1129,6 +1152,7 @@ def add_if_not_none(key, value):
if self._use_spot_specified:
add_if_not_none('use_spot', self.use_spot)
config['spot_recovery'] = self.spot_recovery
config['use_managed_demand'] = self.use_managed_demand
config['disk_size'] = self.disk_size
add_if_not_none('region', self.region)
add_if_not_none('zone', self.zone)
Expand Down
14 changes: 14 additions & 0 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,20 @@ def set_resources(
def get_resources(self):
return self.resources

def get_resources_both_spot_and_demand(self):

return_resoruces = set()
for r in self.resources:
if r.use_managed_demand:
return_resoruces.add(
r.copy(use_spot=False, use_managed_demand=True))
if r.use_spot:
return_resoruces.add(
r.copy(use_spot=True, use_managed_demand=False))
if not r.use_managed_demand and not r.use_spot:
return_resoruces.add(r.copy())
return return_resoruces

def set_time_estimator(self, func: Callable[['sky.Resources'],
int]) -> 'Task':
"""Sets a func mapping resources to estimated time (secs).
Expand Down
3 changes: 3 additions & 0 deletions sky/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def get_resources_schema():
'use_spot': {
'type': 'boolean',
},
'managed': {
'type': 'string',
},
'spot_recovery': {
'type': 'string',
},
Expand Down