diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 94e6a9e8f97..e159ed40721 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -64,6 +64,9 @@ Available fields: # If unspecified, defaults to False (on-demand instances). use_spot: False + # Launch a managed on-demand job. + managed: 'ON_DEMAND' + # The recovery strategy for spot jobs (optional). # `use_spot` must be True for this to have any effect. For now, only # `FAILOVER` strategy is supported. diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml new file mode 100644 index 00000000000..ec32d7762c3 --- /dev/null +++ b/examples/spot_demand_scheduling.yaml @@ -0,0 +1,9 @@ +name: spot-demand-scheduling + +resources: + cloud: gcp + managed: ON_DEMAND + +run: | + echo "Hello World!" + sleep 600 \ No newline at end of file diff --git a/sky/cli.py b/sky/cli.py index 924f44d294b..a1de16eca73 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1029,6 +1029,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides( instance_type: Optional[str] = None, num_nodes: Optional[int] = None, use_spot: Optional[bool] = None, + use_managed_demand: Optional[bool] = None, image_id: Optional[str] = None, disk_size: Optional[int] = None, disk_tier: Optional[str] = None, @@ -1101,6 +1102,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if spot_recovery is not None: override_params['spot_recovery'] = spot_recovery + if use_managed_demand is not None: + override_params['use_managed_demand'] = use_managed_demand + assert len(task.resources) == 1 old_resources = list(task.resources)[0] new_resources = old_resources.copy(**override_params) @@ -3480,6 +3484,10 @@ def spot(): default=None, type=str, help='Spot recovery strategy to use for the managed spot task.') +@click.option('--use-managed-demand', + default=None, + type=str, + help='Use managed demand instances.') @click.option('--disk-size', default=None, type=int, @@ -3533,6 +3541,7 @@ def spot_launch( instance_type: Optional[str], num_nodes: Optional[int], use_spot: Optional[bool], + use_managed_demand: Optional[bool], image_id: Optional[str], spot_recovery: Optional[str], env_file: Optional[Dict[str, str]], @@ -3571,6 +3580,7 @@ def spot_launch( instance_type=instance_type, num_nodes=num_nodes, use_spot=use_spot, + use_managed_demand=use_managed_demand, image_id=image_id, env=env, disk_size=disk_size, @@ -3598,6 +3608,11 @@ def spot_launch( else: dag = task_or_dag + if spot_recovery and not use_spot and not use_managed_demand: + click.secho( + 'Both use_spot and use_managed_demand are False or not specified') + sys.exit(1) + if name is not None: dag.name = name diff --git a/sky/optimizer.py b/sky/optimizer.py index 32cb5a315fd..27d4a59038a 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -993,7 +993,7 @@ def _fill_in_launchable_resources( List[resources_lib.Resources]) if blocked_resources is None: blocked_resources = [] - for resources in task.get_resources(): + for resources in task.get_resources_both_spot_and_demand(): if resources.cloud is not None and not _cloud_in_list( resources.cloud, enabled_clouds): if try_fix_with_sky_check: diff --git a/sky/resources.py b/sky/resources.py index 57bb16d576d..8207be6eacc 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -52,6 +52,7 @@ def __init__( accelerators: Union[None, str, Dict[str, int]] = None, accelerator_args: Optional[Dict[str, str]] = None, use_spot: Optional[bool] = None, + use_managed_demand: Optional[bool] = None, spot_recovery: Optional[str] = None, region: Optional[str] = None, zone: Optional[str] = None, @@ -139,6 +140,7 @@ def __init__( self._use_spot_specified = use_spot is not None self._use_spot = use_spot if use_spot is not None else False + self._use_managed_demand = use_managed_demand self._spot_recovery = None if spot_recovery is not None: if spot_recovery.strip().lower() != 'none': @@ -344,6 +346,10 @@ def accelerator_args(self) -> Optional[Dict[str, str]]: def use_spot(self) -> bool: return self._use_spot + @property + def use_managed_demand(self) -> bool: + return self._use_managed_demand + @property def use_spot_specified(self) -> bool: return self._use_spot_specified @@ -659,7 +665,7 @@ def _try_validate_cpus_mem(self) -> None: def _try_validate_spot(self) -> None: if self._spot_recovery is None: return - if not self._use_spot: + if not self._use_spot and not self._use_managed_demand: with ux_utils.print_exception_no_traceback(): raise ValueError( 'Cannot specify spot_recovery without use_spot set to True.' @@ -1029,6 +1035,8 @@ def copy(self, **override) -> 'Resources': self.accelerator_args), use_spot=override.pop('use_spot', use_spot), spot_recovery=override.pop('spot_recovery', self.spot_recovery), + use_managed_demand=override.pop('use_managed_demand', + self.use_managed_demand), disk_size=override.pop('disk_size', self.disk_size), region=override.pop('region', self.region), zone=override.pop('zone', self.zone), @@ -1107,7 +1115,22 @@ def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources': resources_fields['accelerator_args']) if resources_fields['disk_size'] is not None: resources_fields['disk_size'] = int(resources_fields['disk_size']) - + if config.get('managed') is not None: + managed_str = config.pop('managed') + if managed_str == 'SPOT': + resources_fields['use_spot'] = True + resources_fields['use_managed_demand'] = False + elif managed_str == 'ON_DEMAND': + resources_fields['use_spot'] = False + resources_fields['use_managed_demand'] = True + elif managed_str == 'CONSIDER_BOTH': + resources_fields['use_spot'] = True + resources_fields['use_managed_demand'] = True + else: + raise ValueError( + f'Invalid value {managed_str} for managed. ' + 'Please use one of "SPOT", "ON_DEMAND", or "CONSIDER_BOTH".' + ) assert not config, f'Invalid resource args: {config.keys()}' return Resources(**resources_fields) @@ -1129,6 +1152,7 @@ def add_if_not_none(key, value): if self._use_spot_specified: add_if_not_none('use_spot', self.use_spot) config['spot_recovery'] = self.spot_recovery + config['use_managed_demand'] = self.use_managed_demand config['disk_size'] = self.disk_size add_if_not_none('region', self.region) add_if_not_none('zone', self.zone) diff --git a/sky/task.py b/sky/task.py index 9eb45ec1ea7..f5a24875925 100644 --- a/sky/task.py +++ b/sky/task.py @@ -568,6 +568,20 @@ def set_resources( def get_resources(self): return self.resources + def get_resources_both_spot_and_demand(self): + + return_resoruces = set() + for r in self.resources: + if r.use_managed_demand: + return_resoruces.add( + r.copy(use_spot=False, use_managed_demand=True)) + if r.use_spot: + return_resoruces.add( + r.copy(use_spot=True, use_managed_demand=False)) + if not r.use_managed_demand and not r.use_spot: + return_resoruces.add(r.copy()) + return return_resoruces + def set_time_estimator(self, func: Callable[['sky.Resources'], int]) -> 'Task': """Sets a func mapping resources to estimated time (secs). diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 597c51a63cd..9b4b2ec3fc8 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -58,6 +58,9 @@ def get_resources_schema(): 'use_spot': { 'type': 'boolean', }, + 'managed': { + 'type': 'string', + }, 'spot_recovery': { 'type': 'string', },