From 83fb9b1059107b9a88f271ab071db262019932ee Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Sun, 10 Sep 2023 17:46:49 -0700 Subject: [PATCH 1/7] use_managed_demand init --- docs/source/reference/yaml-spec.rst | 3 ++ examples/spot_demand_scheduling.yaml | 10 +++++++ sky/cli.py | 42 ++++++++++++++++++++-------- sky/resources.py | 18 +++++++++++- sky/utils/schemas.py | 3 ++ 5 files changed, 64 insertions(+), 12 deletions(-) create mode 100644 examples/spot_demand_scheduling.yaml diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 94e6a9e8f97..6378d2d4bef 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -64,6 +64,9 @@ Available fields: # If unspecified, defaults to False (on-demand instances). use_spot: False + # Launch a managed on-demand job. + use_managed_demand: False + # The recovery strategy for spot jobs (optional). # `use_spot` must be True for this to have any effect. For now, only # `FAILOVER` strategy is supported. diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml new file mode 100644 index 00000000000..5af5ef95106 --- /dev/null +++ b/examples/spot_demand_scheduling.yaml @@ -0,0 +1,10 @@ +name: spot-demand-scheduling + +resources: + cloud: gcp + use_spot: False + use_managed_demand: True + +run: | + echo "Hello World!" + sleep 600 \ No newline at end of file diff --git a/sky/cli.py b/sky/cli.py index 924f44d294b..a8e4c07910a 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -216,6 +216,12 @@ def _interactive_node_cli_command(cli_func): is_flag=True, help='If true, use spot instances.') + use_managed_demand_option = click.option( + '--use-managed-demand', + default=None, + is_flag=True, + help='If true, Launched managed on-demand job.') + tpuvm_option = click.option('--tpu-vm', default=False, is_flag=True, @@ -305,6 +311,7 @@ def _interactive_node_cli_command(cli_func): *([gpus] if cli_func.__name__ == 'gpunode' else []), *([tpus] if cli_func.__name__ == 'tpunode' else []), spot_option, + use_managed_demand_option, *([tpuvm_option] if cli_func.__name__ == 'tpunode' else []), # Attach options @@ -391,6 +398,10 @@ def _merge_env_vars(env_dict: Optional[Dict[str, str]], default=None, help=('Whether to request spot instances. If specified, overrides the ' '"resources.use_spot" config.')), + click.option('--use-managed-demand', + default=None, + is_flag=True, + help='If true, Launched managed on-demand job.'), click.option('--image-id', required=False, default=None, @@ -621,6 +632,7 @@ def _parse_override_params(cloud: Optional[str] = None, memory: Optional[str] = None, instance_type: Optional[str] = None, use_spot: Optional[bool] = None, + use_managed_demand: Optional[bool] = None, image_id: Optional[str] = None, disk_size: Optional[int] = None, disk_tier: Optional[str] = None) -> Dict[str, Any]: @@ -663,6 +675,8 @@ def _parse_override_params(cloud: Optional[str] = None, override_params['instance_type'] = instance_type if use_spot is not None: override_params['use_spot'] = use_spot + if use_managed_demand is not None: + override_params['use_managed_demand'] = use_managed_demand if image_id is not None: if image_id.lower() == 'none': override_params['image_id'] = None @@ -1029,6 +1043,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides( instance_type: Optional[str] = None, num_nodes: Optional[int] = None, use_spot: Optional[bool] = None, + use_managed_demand: Optional[bool] = None, image_id: Optional[str] = None, disk_size: Optional[int] = None, disk_tier: Optional[str] = None, @@ -1060,17 +1075,19 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if onprem_utils.check_local_cloud_args(cloud, cluster, yaml_config): cloud = 'local' - override_params = _parse_override_params(cloud=cloud, - region=region, - zone=zone, - gpus=gpus, - cpus=cpus, - memory=memory, - instance_type=instance_type, - use_spot=use_spot, - image_id=image_id, - disk_size=disk_size, - disk_tier=disk_tier) + override_params = _parse_override_params( + cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + use_spot=use_spot, + use_managed_demand=use_managed_demand, + image_id=image_id, + disk_size=disk_size, + disk_tier=disk_tier) if is_yaml: assert entrypoint is not None @@ -1101,6 +1118,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if spot_recovery is not None: override_params['spot_recovery'] = spot_recovery + if use_managed_demand is not None: + override_params['use_managed_demand'] = use_managed_demand + assert len(task.resources) == 1 old_resources = list(task.resources)[0] new_resources = old_resources.copy(**override_params) diff --git a/sky/resources.py b/sky/resources.py index 8c139f07c44..44408672585 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -52,6 +52,7 @@ def __init__( accelerators: Union[None, str, Dict[str, int]] = None, accelerator_args: Optional[Dict[str, str]] = None, use_spot: Optional[bool] = None, + use_managed_demand: Optional[bool] = None, spot_recovery: Optional[str] = None, region: Optional[str] = None, zone: Optional[str] = None, @@ -139,6 +140,7 @@ def __init__( self._use_spot_specified = use_spot is not None self._use_spot = use_spot if use_spot is not None else False + self._use_managed_demand = use_managed_demand self._spot_recovery = None if spot_recovery is not None: if spot_recovery.strip().lower() != 'none': @@ -344,6 +346,10 @@ def accelerator_args(self) -> Optional[Dict[str, str]]: def use_spot(self) -> bool: return self._use_spot + @property + def use_managed_demand(self) -> bool: + return self._use_managed_demand + @property def use_spot_specified(self) -> bool: return self._use_spot_specified @@ -659,11 +665,15 @@ def _try_validate_cpus_mem(self) -> None: def _try_validate_spot(self) -> None: if self._spot_recovery is None: return - if not self._use_spot: + if not self._use_spot and not self._use_managed_demand: with ux_utils.print_exception_no_traceback(): raise ValueError( 'Cannot specify spot_recovery without use_spot set to True.' ) + if self._use_spot and self._use_managed_demand: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Cannot specify both use_spot and use_managed_demand') if self._spot_recovery not in spot.SPOT_STRATEGIES: with ux_utils.print_exception_no_traceback(): raise ValueError( @@ -1029,6 +1039,8 @@ def copy(self, **override) -> 'Resources': self.accelerator_args), use_spot=override.pop('use_spot', use_spot), spot_recovery=override.pop('spot_recovery', self.spot_recovery), + use_managed_demand=override.pop('use_managed_demand', + self.use_managed_demand), disk_size=override.pop('disk_size', self.disk_size), region=override.pop('region', self.region), zone=override.pop('zone', self.zone), @@ -1093,6 +1105,9 @@ def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources': config.pop('accelerator_args')) if config.get('use_spot') is not None: resources_fields['use_spot'] = config.pop('use_spot') + if config.get('use_managed_demand') is not None: + resources_fields['use_managed_demand'] = config.pop( + 'use_managed_demand') if config.get('spot_recovery') is not None: resources_fields['spot_recovery'] = config.pop('spot_recovery') if config.get('disk_size') is not None: @@ -1135,6 +1150,7 @@ def add_if_not_none(key, value): if self._use_spot_specified: add_if_not_none('use_spot', self.use_spot) config['spot_recovery'] = self.spot_recovery + config['use_managed_demand'] = self.use_managed_demand config['disk_size'] = self.disk_size add_if_not_none('region', self.region) add_if_not_none('zone', self.zone) diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 597c51a63cd..c97aeba2909 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -58,6 +58,9 @@ def get_resources_schema(): 'use_spot': { 'type': 'boolean', }, + 'use_managed_demand': { + 'type': 'boolean', + }, 'spot_recovery': { 'type': 'string', }, From b9c70fe27577c9ab51b7f0d4626c435a0d83fa6c Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Sun, 10 Sep 2023 21:28:56 -0700 Subject: [PATCH 2/7] added switching helper function --- sky/resources.py | 8 ++++++++ sky/task.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/sky/resources.py b/sky/resources.py index 44408672585..1138bfba556 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -533,6 +533,14 @@ def _set_region_zone(self, region: Optional[str], self._region, self._zone = self._cloud.validate_region_zone( region, zone) + def switch_from_spot_to_on_demand(self): + self._use_managed_demand = True + self._use_spot = False + + def switch_from_on_demand_to_spot(self): + self._use_managed_demand = False + self._use_spot = True + def get_valid_regions_for_launchable(self) -> List[clouds.Region]: """Returns a set of `Region`s that can provision this Resources. diff --git a/sky/task.py b/sky/task.py index 9eb45ec1ea7..19ecaafcfbb 100644 --- a/sky/task.py +++ b/sky/task.py @@ -504,6 +504,14 @@ def need_spot_recovery(self) -> bool: def use_spot(self) -> bool: return any(r.use_spot for r in self.resources) + def switch_from_spot_to_on_demand(self): + for r in self.resources: + r.switch_from_spot_to_on_demand() + + def switch_from_on_demand_to_spot(self): + for r in self.resources: + r.switch_from_on_demand_to_spot() + def set_inputs(self, inputs: str, estimated_size_gigabytes: float) -> 'Task': # E.g., 's3://bucket', 'gs://bucket', or None. From 0cd5135ea075e6c30b7eabbc87d61a9ad0ebbc50 Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Mon, 11 Sep 2023 16:14:20 -0700 Subject: [PATCH 3/7] both use_spot and use_managed_demand --- examples/spot_demand_scheduling.yaml | 2 +- sky/cli.py | 1 + sky/optimizer.py | 2 +- sky/resources.py | 4 ---- sky/task.py | 12 ++++++++++++ 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml index 5af5ef95106..4225d0e19f2 100644 --- a/examples/spot_demand_scheduling.yaml +++ b/examples/spot_demand_scheduling.yaml @@ -2,7 +2,7 @@ name: spot-demand-scheduling resources: cloud: gcp - use_spot: False + # use_spot: True use_managed_demand: True run: | diff --git a/sky/cli.py b/sky/cli.py index a8e4c07910a..b888d6f4b1e 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3553,6 +3553,7 @@ def spot_launch( instance_type: Optional[str], num_nodes: Optional[int], use_spot: Optional[bool], + use_managed_demand: Optional[bool], image_id: Optional[str], spot_recovery: Optional[str], env_file: Optional[Dict[str, str]], diff --git a/sky/optimizer.py b/sky/optimizer.py index 32cb5a315fd..27d4a59038a 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -993,7 +993,7 @@ def _fill_in_launchable_resources( List[resources_lib.Resources]) if blocked_resources is None: blocked_resources = [] - for resources in task.get_resources(): + for resources in task.get_resources_both_spot_and_demand(): if resources.cloud is not None and not _cloud_in_list( resources.cloud, enabled_clouds): if try_fix_with_sky_check: diff --git a/sky/resources.py b/sky/resources.py index 1138bfba556..8e8513e30df 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -678,10 +678,6 @@ def _try_validate_spot(self) -> None: raise ValueError( 'Cannot specify spot_recovery without use_spot set to True.' ) - if self._use_spot and self._use_managed_demand: - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Cannot specify both use_spot and use_managed_demand') if self._spot_recovery not in spot.SPOT_STRATEGIES: with ux_utils.print_exception_no_traceback(): raise ValueError( diff --git a/sky/task.py b/sky/task.py index 19ecaafcfbb..0d0daa1a6ab 100644 --- a/sky/task.py +++ b/sky/task.py @@ -576,6 +576,18 @@ def set_resources( def get_resources(self): return self.resources + def get_resources_both_spot_and_demand(self): + + return_resoruces = set() + for r in self.resources: + if r.use_managed_demand is True: + return_resoruces.add( + r.copy(use_spot=False, use_managed_demand=True)) + if r.use_spot is True: + return_resoruces.add( + r.copy(use_spot=True, use_managed_demand=False)) + return return_resoruces + def set_time_estimator(self, func: Callable[['sky.Resources'], int]) -> 'Task': """Sets a func mapping resources to estimated time (secs). From cdfe126dd3c842aaed953353add281ffb49461bc Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Mon, 11 Sep 2023 17:07:50 -0700 Subject: [PATCH 4/7] added checks in spot_launch --- examples/spot_demand_scheduling.yaml | 2 +- sky/cli.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml index 4225d0e19f2..51565ca51e9 100644 --- a/examples/spot_demand_scheduling.yaml +++ b/examples/spot_demand_scheduling.yaml @@ -2,7 +2,7 @@ name: spot-demand-scheduling resources: cloud: gcp - # use_spot: True + use_spot: True use_managed_demand: True run: | diff --git a/sky/cli.py b/sky/cli.py index b888d6f4b1e..7a340e1c409 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3619,6 +3619,11 @@ def spot_launch( else: dag = task_or_dag + if not use_spot and not use_managed_demand: + click.secho( + 'Both use_spot and use_managed_demand are False or not specified') + sys.exit(1) + if name is not None: dag.name = name From bde016e27d860a4900790294438783f9717692d0 Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Mon, 11 Sep 2023 17:17:18 -0700 Subject: [PATCH 5/7] fix bug --- sky/task.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sky/task.py b/sky/task.py index 0d0daa1a6ab..a64ef63f07c 100644 --- a/sky/task.py +++ b/sky/task.py @@ -580,12 +580,14 @@ def get_resources_both_spot_and_demand(self): return_resoruces = set() for r in self.resources: - if r.use_managed_demand is True: + if r.use_managed_demand: return_resoruces.add( r.copy(use_spot=False, use_managed_demand=True)) - if r.use_spot is True: + if r.use_spot: return_resoruces.add( r.copy(use_spot=True, use_managed_demand=False)) + if not r.use_managed_demand and not r.use_spot: + return_resoruces.add(r.copy()) return return_resoruces def set_time_estimator(self, func: Callable[['sky.Resources'], From fc22d58e8236dfaa49cc505a3fc857f1e855b20e Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Mon, 11 Sep 2023 17:48:41 -0700 Subject: [PATCH 6/7] fix bugs --- examples/spot_demand_scheduling.yaml | 2 +- sky/cli.py | 45 +++++++++++----------------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml index 51565ca51e9..5af5ef95106 100644 --- a/examples/spot_demand_scheduling.yaml +++ b/examples/spot_demand_scheduling.yaml @@ -2,7 +2,7 @@ name: spot-demand-scheduling resources: cloud: gcp - use_spot: True + use_spot: False use_managed_demand: True run: | diff --git a/sky/cli.py b/sky/cli.py index 7a340e1c409..a1de16eca73 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -216,12 +216,6 @@ def _interactive_node_cli_command(cli_func): is_flag=True, help='If true, use spot instances.') - use_managed_demand_option = click.option( - '--use-managed-demand', - default=None, - is_flag=True, - help='If true, Launched managed on-demand job.') - tpuvm_option = click.option('--tpu-vm', default=False, is_flag=True, @@ -311,7 +305,6 @@ def _interactive_node_cli_command(cli_func): *([gpus] if cli_func.__name__ == 'gpunode' else []), *([tpus] if cli_func.__name__ == 'tpunode' else []), spot_option, - use_managed_demand_option, *([tpuvm_option] if cli_func.__name__ == 'tpunode' else []), # Attach options @@ -398,10 +391,6 @@ def _merge_env_vars(env_dict: Optional[Dict[str, str]], default=None, help=('Whether to request spot instances. If specified, overrides the ' '"resources.use_spot" config.')), - click.option('--use-managed-demand', - default=None, - is_flag=True, - help='If true, Launched managed on-demand job.'), click.option('--image-id', required=False, default=None, @@ -632,7 +621,6 @@ def _parse_override_params(cloud: Optional[str] = None, memory: Optional[str] = None, instance_type: Optional[str] = None, use_spot: Optional[bool] = None, - use_managed_demand: Optional[bool] = None, image_id: Optional[str] = None, disk_size: Optional[int] = None, disk_tier: Optional[str] = None) -> Dict[str, Any]: @@ -675,8 +663,6 @@ def _parse_override_params(cloud: Optional[str] = None, override_params['instance_type'] = instance_type if use_spot is not None: override_params['use_spot'] = use_spot - if use_managed_demand is not None: - override_params['use_managed_demand'] = use_managed_demand if image_id is not None: if image_id.lower() == 'none': override_params['image_id'] = None @@ -1075,19 +1061,17 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if onprem_utils.check_local_cloud_args(cloud, cluster, yaml_config): cloud = 'local' - override_params = _parse_override_params( - cloud=cloud, - region=region, - zone=zone, - gpus=gpus, - cpus=cpus, - memory=memory, - instance_type=instance_type, - use_spot=use_spot, - use_managed_demand=use_managed_demand, - image_id=image_id, - disk_size=disk_size, - disk_tier=disk_tier) + override_params = _parse_override_params(cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + use_spot=use_spot, + image_id=image_id, + disk_size=disk_size, + disk_tier=disk_tier) if is_yaml: assert entrypoint is not None @@ -3500,6 +3484,10 @@ def spot(): default=None, type=str, help='Spot recovery strategy to use for the managed spot task.') +@click.option('--use-managed-demand', + default=None, + type=str, + help='Use managed demand instances.') @click.option('--disk-size', default=None, type=int, @@ -3592,6 +3580,7 @@ def spot_launch( instance_type=instance_type, num_nodes=num_nodes, use_spot=use_spot, + use_managed_demand=use_managed_demand, image_id=image_id, env=env, disk_size=disk_size, @@ -3619,7 +3608,7 @@ def spot_launch( else: dag = task_or_dag - if not use_spot and not use_managed_demand: + if spot_recovery and not use_spot and not use_managed_demand: click.secho( 'Both use_spot and use_managed_demand are False or not specified') sys.exit(1) From f123dd861383f266c319e6ac5b3f670bd5cf2c5e Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Wed, 13 Sep 2023 09:21:19 -0700 Subject: [PATCH 7/7] use managed --- docs/source/reference/yaml-spec.rst | 2 +- examples/spot_demand_scheduling.yaml | 3 +-- sky/resources.py | 27 ++++++++++++++++----------- sky/task.py | 8 -------- sky/utils/schemas.py | 4 ++-- 5 files changed, 20 insertions(+), 24 deletions(-) diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 6378d2d4bef..e159ed40721 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -65,7 +65,7 @@ Available fields: use_spot: False # Launch a managed on-demand job. - use_managed_demand: False + managed: 'ON_DEMAND' # The recovery strategy for spot jobs (optional). # `use_spot` must be True for this to have any effect. For now, only diff --git a/examples/spot_demand_scheduling.yaml b/examples/spot_demand_scheduling.yaml index 5af5ef95106..ec32d7762c3 100644 --- a/examples/spot_demand_scheduling.yaml +++ b/examples/spot_demand_scheduling.yaml @@ -2,8 +2,7 @@ name: spot-demand-scheduling resources: cloud: gcp - use_spot: False - use_managed_demand: True + managed: ON_DEMAND run: | echo "Hello World!" diff --git a/sky/resources.py b/sky/resources.py index 8e8513e30df..6084f5de5af 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -533,14 +533,6 @@ def _set_region_zone(self, region: Optional[str], self._region, self._zone = self._cloud.validate_region_zone( region, zone) - def switch_from_spot_to_on_demand(self): - self._use_managed_demand = True - self._use_spot = False - - def switch_from_on_demand_to_spot(self): - self._use_managed_demand = False - self._use_spot = True - def get_valid_regions_for_launchable(self) -> List[clouds.Region]: """Returns a set of `Region`s that can provision this Resources. @@ -1109,9 +1101,22 @@ def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources': config.pop('accelerator_args')) if config.get('use_spot') is not None: resources_fields['use_spot'] = config.pop('use_spot') - if config.get('use_managed_demand') is not None: - resources_fields['use_managed_demand'] = config.pop( - 'use_managed_demand') + if config.get('managed') is not None: + managed_str = config.pop('managed') + if managed_str == 'SPOT': + resources_fields['use_spot'] = True + resources_fields['use_managed_demand'] = False + elif managed_str == 'ON_DEMAND': + resources_fields['use_spot'] = False + resources_fields['use_managed_demand'] = True + elif managed_str == 'CONSIDER_BOTH': + resources_fields['use_spot'] = True + resources_fields['use_managed_demand'] = True + else: + raise ValueError( + f'Invalid value {managed_str} for managed. ' + 'Please use one of "SPOT", "ON_DEMAND", or "CONSIDER_BOTH".' + ) if config.get('spot_recovery') is not None: resources_fields['spot_recovery'] = config.pop('spot_recovery') if config.get('disk_size') is not None: diff --git a/sky/task.py b/sky/task.py index a64ef63f07c..f5a24875925 100644 --- a/sky/task.py +++ b/sky/task.py @@ -504,14 +504,6 @@ def need_spot_recovery(self) -> bool: def use_spot(self) -> bool: return any(r.use_spot for r in self.resources) - def switch_from_spot_to_on_demand(self): - for r in self.resources: - r.switch_from_spot_to_on_demand() - - def switch_from_on_demand_to_spot(self): - for r in self.resources: - r.switch_from_on_demand_to_spot() - def set_inputs(self, inputs: str, estimated_size_gigabytes: float) -> 'Task': # E.g., 's3://bucket', 'gs://bucket', or None. diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index c97aeba2909..9b4b2ec3fc8 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -58,8 +58,8 @@ def get_resources_schema(): 'use_spot': { 'type': 'boolean', }, - 'use_managed_demand': { - 'type': 'boolean', + 'managed': { + 'type': 'string', }, 'spot_recovery': { 'type': 'string',