From c7009bb048fcaa37c015075a755d47528d9baa01 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Sat, 16 Dec 2023 15:47:25 -0800 Subject: [PATCH 1/6] finish --- sky/cli.py | 349 ++++++++++++++++++++++++++++++-------- sky/serve/core.py | 19 ++- sky/serve/service_spec.py | 17 +- sky/task.py | 10 +- sky/utils/dag_utils.py | 11 +- 5 files changed, 320 insertions(+), 86 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 0425b9999e1..7fa0100ead6 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -26,6 +26,7 @@ import copy import datetime import functools +import json import multiprocessing import os import shlex @@ -637,70 +638,118 @@ def _add_options(func): return _add_options -def _parse_override_params( - cloud: Optional[str] = None, - region: Optional[str] = None, - zone: Optional[str] = None, - gpus: Optional[str] = None, - cpus: Optional[str] = None, - memory: Optional[str] = None, - instance_type: Optional[str] = None, - use_spot: Optional[bool] = None, - image_id: Optional[str] = None, - disk_size: Optional[int] = None, - disk_tier: Optional[str] = None, - ports: Optional[Tuple[str]] = None) -> Dict[str, Any]: - """Parses the override parameters into a dictionary.""" - override_params: Dict[str, Any] = {} +def _parse_resources_override_params( + cloud: Optional[str] = None, + region: Optional[str] = None, + zone: Optional[str] = None, + gpus: Optional[str] = None, + cpus: Optional[str] = None, + memory: Optional[str] = None, + instance_type: Optional[str] = None, + use_spot: Optional[bool] = None, + image_id: Optional[str] = None, + disk_size: Optional[int] = None, + disk_tier: Optional[str] = None, + ports: Optional[Tuple[str]] = None, +) -> Dict[str, Any]: + """Parses the resources override parameters into a dictionary.""" + resources_override_params: Dict[str, Any] = {} if cloud is not None: if cloud.lower() == 'none': - override_params['cloud'] = None + resources_override_params['cloud'] = None else: - override_params['cloud'] = clouds.CLOUD_REGISTRY.from_str(cloud) + resources_override_params['cloud'] = ( + clouds.CLOUD_REGISTRY.from_str(cloud)) if region is not None: if region.lower() == 'none': - override_params['region'] = None + resources_override_params['region'] = None else: - override_params['region'] = region + resources_override_params['region'] = region if zone is not None: if zone.lower() == 'none': - override_params['zone'] = None + resources_override_params['zone'] = None else: - override_params['zone'] = zone + resources_override_params['zone'] = zone if gpus is not None: if gpus.lower() == 'none': - override_params['accelerators'] = None + resources_override_params['accelerators'] = None else: - override_params['accelerators'] = gpus + resources_override_params['accelerators'] = gpus if cpus is not None: if cpus.lower() == 'none': - override_params['cpus'] = None + resources_override_params['cpus'] = None else: - override_params['cpus'] = cpus + resources_override_params['cpus'] = cpus if memory is not None: if memory.lower() == 'none': - override_params['memory'] = None + resources_override_params['memory'] = None else: - override_params['memory'] = memory + resources_override_params['memory'] = memory if instance_type is not None: if instance_type.lower() == 'none': - override_params['instance_type'] = None + resources_override_params['instance_type'] = None else: - override_params['instance_type'] = instance_type + resources_override_params['instance_type'] = instance_type if use_spot is not None: - override_params['use_spot'] = use_spot + resources_override_params['use_spot'] = use_spot if image_id is not None: if image_id.lower() == 'none': - override_params['image_id'] = None + resources_override_params['image_id'] = None else: - override_params['image_id'] = image_id + resources_override_params['image_id'] = image_id if disk_size is not None: - override_params['disk_size'] = disk_size + resources_override_params['disk_size'] = disk_size if disk_tier is not None: - override_params['disk_tier'] = disk_tier + resources_override_params['disk_tier'] = disk_tier if ports: - override_params['ports'] = ports - return override_params + resources_override_params['ports'] = ports + return resources_override_params + + +def _parse_services_override_params( + readiness_path: Optional[str] = None, + initial_delay_seconds: Optional[int] = None, + post_data: Optional[str] = None, + auto_restart: Optional[bool] = None, + min_replicas: Optional[int] = None, + max_replicas: Optional[int] = None, + qps_upper_threshold: Optional[int] = None, + qps_lower_threshold: Optional[int] = None, +) -> Dict[str, Any]: + """Parses the services override parameters into a dictionary.""" + services_override_params: Dict[str, Any] = {} + if readiness_path is not None: + if readiness_path.lower() == 'none': + services_override_params['readiness_path'] = None + else: + services_override_params['readiness_path'] = readiness_path + if initial_delay_seconds is not None: + services_override_params[ + 'initial_delay_seconds'] = initial_delay_seconds + if post_data is not None: + if post_data.lower() == 'none': + services_override_params['post_data'] = None + else: + try: + post_data = json.loads(post_data) + except json.JSONDecodeError as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Invalid JSON string for `post_data` in the ' + '`readiness_probe` section of your service YAML.' + ) from e + services_override_params['post_data'] = post_data + if auto_restart is not None: + services_override_params['auto_restart'] = auto_restart + if min_replicas is not None: + services_override_params['min_replicas'] = min_replicas + if max_replicas is not None: + services_override_params['max_replicas'] = max_replicas + if qps_upper_threshold is not None: + services_override_params['qps_upper_threshold'] = qps_upper_threshold + if qps_lower_threshold is not None: + services_override_params['qps_lower_threshold'] = qps_lower_threshold + return services_override_params def _default_interactive_node_name(node_type: str): @@ -1062,6 +1111,14 @@ def _make_task_or_dag_from_entrypoint_with_overrides( disk_size: Optional[int] = None, disk_tier: Optional[str] = None, ports: Optional[Tuple[str]] = None, + readiness_path: Optional[str] = None, + initial_delay_seconds: Optional[int] = None, + post_data: Optional[str] = None, + auto_restart: Optional[bool] = None, + min_replicas: Optional[int] = None, + max_replicas: Optional[int] = None, + qps_upper_threshold: Optional[int] = None, + qps_lower_threshold: Optional[int] = None, env: Optional[List[Tuple[str, str]]] = None, # spot launch specific spot_recovery: Optional[str] = None, @@ -1094,30 +1151,53 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if onprem_utils.check_local_cloud_args(cloud, cluster, yaml_config): cloud = 'local' - override_params = _parse_override_params(cloud=cloud, - region=region, - zone=zone, - gpus=gpus, - cpus=cpus, - memory=memory, - instance_type=instance_type, - use_spot=use_spot, - image_id=image_id, - disk_size=disk_size, - disk_tier=disk_tier, - ports=ports) + resources_override_params = _parse_resources_override_params( + cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + use_spot=use_spot, + image_id=image_id, + disk_size=disk_size, + disk_tier=disk_tier, + ports=ports, + ) + # TODO(tian): Add replicas shortcut. + services_override_params = _parse_services_override_params( + readiness_path=readiness_path, + initial_delay_seconds=initial_delay_seconds, + post_data=post_data, + auto_restart=auto_restart, + min_replicas=min_replicas, + max_replicas=max_replicas, + qps_upper_threshold=qps_upper_threshold, + qps_lower_threshold=qps_lower_threshold, + ) if is_yaml: assert entrypoint is not None usage_lib.messages.usage.update_user_task_yaml(entrypoint) - dag = dag_utils.load_chain_dag_from_yaml(entrypoint, env_overrides=env) + dag = dag_utils.load_chain_dag_from_yaml( + entrypoint, + services_overrides=services_override_params, + env_overrides=env) if len(dag.tasks) > 1: # When the dag has more than 1 task. It is unclear how to # override the params for the dag. So we just ignore the # override params. - if override_params: + if resources_override_params: click.secho( - f'WARNING: override params {override_params} are ignored, ' + 'WARNING: Resources override params ' + f'{resources_override_params} are ignored, ' + 'since the yaml file contains multiple tasks.', + fg='yellow') + if services_override_params: + click.secho( + 'WARNING: Services override params ' + f'{services_override_params} are ignored, ' 'since the yaml file contains multiple tasks.', fg='yellow') return dag @@ -1127,6 +1207,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides( else: task = sky.Task(name='sky-cmd', run=entrypoint) task.set_resources({sky.Resources()}) + task.set_service(serve_lib.SkyServiceSpec(**services_override_params)) # Override. if workdir is not None: @@ -1134,9 +1215,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides( # Spot launch specific. if spot_recovery is not None: - override_params['spot_recovery'] = spot_recovery + resources_override_params['spot_recovery'] = spot_recovery - task.set_resources_override(override_params) + task.set_resources_override(resources_override_params) if num_nodes is not None: task.num_nodes = num_nodes @@ -4206,22 +4287,125 @@ def serve(): type=str, nargs=-1, **_get_shell_complete_args(_complete_file_name)) +# Task options. +@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS) +@click.option('--cpus', + default=None, + type=str, + required=False, + help=('Number of vCPUs each instance must have (e.g., ' + '``--cpus=4`` (exactly 4) or ``--cpus=4+`` (at least 4)). ' + 'This is used to automatically select the instance type.')) +@click.option( + '--memory', + default=None, + type=str, + required=False, + help=('Amount of memory each instance must have in GB (e.g., ' + '``--memory=16`` (exactly 16GB), ``--memory=16+`` (at least 16GB))')) +@click.option('--disk-size', + default=None, + type=int, + required=False, + help=('OS disk size in GBs.')) +@click.option( + '--disk-tier', + default=None, + type=click.Choice(['low', 'medium', 'high'], case_sensitive=False), + required=False, + help=( + 'OS disk tier. Could be one of "low", "medium", "high". Default: medium' + )) +# Service options. @click.option('--service-name', '-n', default=None, type=str, help='A service name. Unique for each service. If not provided, ' 'a unique name is autogenerated.') +@click.option('--readiness-path', + default=None, + type=str, + required=False, + help='Path for the readiness probe.') +@click.option('--initial-delay-seconds', + default=None, + type=int, + required=False, + help=('Initial delay in seconds. Any readiness probe failures ' + 'during this period will be ignored.')) +@click.option('--post-data', + default=None, + type=str, + required=False, + help=('Post data for the readiness probe.')) +@click.option('--auto-restart', + default=None, + type=bool, + required=False, + help=('Whether to restart the service replica if it fails.')) +@click.option('--min-replicas', + default=None, + type=int, + required=False, + help='Minimum number of replicas.') +@click.option('--max-replicas', + default=None, + type=int, + required=False, + help=('Maximum number of replicas. If not specified, SkyServe ' + 'will use fixed number of replicas same as min_replicas ' + 'and ignore any specified QPS threshold.')) +@click.option('--qps-upper-threshold', + default=None, + type=int, + required=False, + help=('QPS threshold for scaling up. If the QPS of your service' + 'exceeds this threshold, SkyServe will scale up your ' + 'service.')) +@click.option('--qps-lower-threshold', + default=None, + type=int, + required=False, + help=('QPS threshold for scaling down. If the QPS of your ' + 'service is lower than this threshold, SkyServe will ' + 'scale down your service.')) @click.option('--yes', '-y', is_flag=True, default=False, required=False, help='Skip confirmation prompt.') -# TODO(tian): Support the task_option overrides for the service. +@timeline.event +@usage_lib.entrypoint def serve_up( service_yaml: List[str], service_name: Optional[str], + name: Optional[str], + workdir: Optional[str], + cloud: Optional[str], + region: Optional[str], + zone: Optional[str], + num_nodes: Optional[int], + use_spot: Optional[bool], + image_id: Optional[str], + env_file: Optional[Dict[str, str]], + env: List[Tuple[str, str]], + gpus: Optional[str], + instance_type: Optional[str], + ports: Tuple[str], + cpus: Optional[str], + memory: Optional[str], + disk_size: Optional[int], + disk_tier: Optional[str], + readiness_path: Optional[str], + initial_delay_seconds: Optional[int], + post_data: Optional[str], + auto_restart: Optional[bool], + min_replicas: Optional[int], + max_replicas: Optional[int], + qps_upper_threshold: Optional[int], + qps_lower_threshold: Optional[int], yes: bool, ): """Launch a SkyServe service. @@ -4255,12 +4439,35 @@ def serve_up( if service_name is None: service_name = serve_lib.generate_service_name() - is_yaml, _ = _check_yaml(''.join(service_yaml)) - if not is_yaml: - raise click.UsageError('SERVICE_YAML must be a valid YAML file.') - # We keep nargs=-1 in service_yaml argument to reuse this function. + env = _merge_env_vars(env_file, env) task = _make_task_or_dag_from_entrypoint_with_overrides( - service_yaml, entrypoint_name='Service') + service_yaml, + name=name, + workdir=workdir, + cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + num_nodes=num_nodes, + use_spot=use_spot, + image_id=image_id, + env=env, + disk_size=disk_size, + disk_tier=disk_tier, + ports=ports, + readiness_path=readiness_path, + initial_delay_seconds=initial_delay_seconds, + post_data=post_data, + auto_restart=auto_restart, + min_replicas=min_replicas, + max_replicas=max_replicas, + qps_upper_threshold=qps_upper_threshold, + qps_lower_threshold=qps_lower_threshold, + entrypoint_name='Service', + ) if isinstance(task, sky.Dag): raise click.UsageError( _DAG_NOT_SUPPORTED_MESSAGE.format(command='sky serve up')) @@ -4268,7 +4475,8 @@ def serve_up( if task.service is None: with ux_utils.print_exception_no_traceback(): raise ValueError('Service section not found in the YAML file. ' - 'To fix, add a valid `service` field.') + 'To fix, add a valid `service` field or pass ' + 'service-related arguments in the CLI.') service_port: Optional[int] = None for requested_resources in list(task.resources): if requested_resources.ports is None or len( @@ -4792,16 +5000,17 @@ def benchmark_launch( config['workdir'] = workdir if num_nodes is not None: config['num_nodes'] = num_nodes - override_params = _parse_override_params(cloud=cloud, - region=region, - zone=zone, - gpus=override_gpu, - use_spot=use_spot, - image_id=image_id, - disk_size=disk_size, - disk_tier=disk_tier, - ports=ports) - resources_config.update(override_params) + resources_override_params = _parse_resources_override_params( + cloud=cloud, + region=region, + zone=zone, + gpus=override_gpu, + use_spot=use_spot, + image_id=image_id, + disk_size=disk_size, + disk_tier=disk_tier, + ports=ports) + resources_config.update(resources_override_params) if 'cloud' in resources_config: cloud = resources_config.pop('cloud') if cloud is not None: diff --git a/sky/serve/core.py b/sky/serve/core.py index 19038072f41..1d94f59fbf9 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -159,15 +159,16 @@ def up( # whether the service is already running. If the id is the same # with the current job id, we know the service is up and running # for the first time; otherwise it is a name conflict. - controller_job_id, controller_handle = execution.execute( - entrypoint=controller_task, - stream_logs=False, - cluster_name=controller_name, - detach_run=True, - idle_minutes_to_autostop=constants. - CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP, - retry_until_up=True, - ) + with ux_utils.enable_traceback(): + controller_job_id, controller_handle = execution.execute( + entrypoint=controller_task, + stream_logs=False, + cluster_name=controller_name, + detach_run=True, + idle_minutes_to_autostop=constants. + CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP, + retry_until_up=True, + ) style = colorama.Style fore = colorama.Fore diff --git a/sky/serve/service_spec.py b/sky/serve/service_spec.py index 6acd5a983bc..fdbee29b8fe 100644 --- a/sky/serve/service_spec.py +++ b/sky/serve/service_spec.py @@ -17,15 +17,26 @@ class SkyServiceSpec: def __init__( self, - readiness_path: str, - initial_delay_seconds: int, - min_replicas: int, + readiness_path: Optional[str] = None, + initial_delay_seconds: Optional[int] = None, + min_replicas: Optional[int] = None, max_replicas: Optional[int] = None, qps_upper_threshold: Optional[float] = None, qps_lower_threshold: Optional[float] = None, post_data: Optional[Dict[str, Any]] = None, auto_restart: bool = True, ) -> None: + missing_fields = [] + if readiness_path is None: + missing_fields.append('readiness_path') + if initial_delay_seconds is None: + missing_fields.append('initial_delay_seconds') + if min_replicas is None: + missing_fields.append('min_replicas') + if missing_fields: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Missing required fields: {", ".join(missing_fields)}') if min_replicas < 0: with ux_utils.print_exception_no_traceback(): raise ValueError( diff --git a/sky/task.py b/sky/task.py index d7e22323aa1..eb2b8474749 100644 --- a/sky/task.py +++ b/sky/task.py @@ -340,6 +340,7 @@ def _validate(self): @staticmethod def from_yaml_config( config: Dict[str, Any], + services_overrides: Optional[Dict[str, Any]] = None, env_overrides: Optional[List[Tuple[str, str]]] = None, ) -> 'Task': if env_overrides is not None: @@ -494,9 +495,12 @@ def from_yaml_config( task.set_resources( {sky.Resources.from_yaml_config(resources_config)}) - service = config.pop('service', None) - if service is not None: - service = service_spec.SkyServiceSpec.from_yaml_config(service) + service_config = config.pop('service', {}) + if services_overrides is not None: + service_config.update(services_overrides) + service = None + if service_config: + service = service_spec.SkyServiceSpec.from_yaml_config(service_config) task.set_service(service) assert not config, f'Invalid task args: {config.keys()}' diff --git a/sky/utils/dag_utils.py b/sky/utils/dag_utils.py index 03ab3a72713..c29d053c052 100644 --- a/sky/utils/dag_utils.py +++ b/sky/utils/dag_utils.py @@ -14,6 +14,7 @@ def load_chain_dag_from_yaml( path: str, + services_overrides: Optional[Dict[str, Any]] = None, env_overrides: Optional[List[Tuple[str, str]]] = None, ) -> dag_lib.Dag: """Loads a chain DAG from a YAML file. @@ -25,6 +26,10 @@ def load_chain_dag_from_yaml( list of (key, value) pairs that will be used to update the task's 'envs' section. + 'services_overrides' is in effect only when there's exactly one task. It is + a dict of service specification that will be used to update the task's + 'services' section. + Returns: A chain Dag with 1 or more tasks (an empty entrypoint would create a trivial task). @@ -46,13 +51,17 @@ def load_chain_dag_from_yaml( # decision to not apply overrides. Here we maintain this behavior. We # can listen to user feedback to change this. env_overrides = None + services_overrides = None + print(1, services_overrides) current_task = None with dag_lib.Dag() as dag: for task_config in configs: if task_config is None: continue - task = task_lib.Task.from_yaml_config(task_config, env_overrides) + task = task_lib.Task.from_yaml_config(task_config, + services_overrides, + env_overrides) if current_task is not None: current_task >> task # pylint: disable=pointless-statement current_task = task From 321f1c5c1b01732a64ca9eca95c31852296a0de4 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Sat, 16 Dec 2023 16:02:01 -0800 Subject: [PATCH 2/6] lint --- sky/serve/service_spec.py | 2 ++ sky/task.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sky/serve/service_spec.py b/sky/serve/service_spec.py index fdbee29b8fe..080397747da 100644 --- a/sky/serve/service_spec.py +++ b/sky/serve/service_spec.py @@ -37,6 +37,8 @@ def __init__( with ux_utils.print_exception_no_traceback(): raise ValueError( f'Missing required fields: {", ".join(missing_fields)}') + assert (readiness_path is not None and + initial_delay_seconds is not None and min_replicas is not None) if min_replicas < 0: with ux_utils.print_exception_no_traceback(): raise ValueError( diff --git a/sky/task.py b/sky/task.py index eb2b8474749..4c74a75b3ab 100644 --- a/sky/task.py +++ b/sky/task.py @@ -498,10 +498,10 @@ def from_yaml_config( service_config = config.pop('service', {}) if services_overrides is not None: service_config.update(services_overrides) - service = None if service_config: - service = service_spec.SkyServiceSpec.from_yaml_config(service_config) - task.set_service(service) + service = service_spec.SkyServiceSpec.from_yaml_config( + service_config) + task.set_service(service) assert not config, f'Invalid task args: {config.keys()}' return task From de2d8a9b674004921a7fd71a277e3952c2bafb13 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Sat, 16 Dec 2023 23:47:49 -0800 Subject: [PATCH 3/6] set name as service name --- sky/cli.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 7fa0100ead6..8e2318b9d8b 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -4437,7 +4437,18 @@ def serve_up( sky serve up service.yaml """ if service_name is None: - service_name = serve_lib.generate_service_name() + if name is None: + name = service_name = serve_lib.generate_service_name() + else: + service_name = name + else: + if name is None: + name = service_name + else: + raise click.UsageError( + 'Cannot specify both --name and --service-name. ' + 'Both of them are used to specify the name of the service. ' + f'Got: {name!r} and {service_name!r}.') env = _merge_env_vars(env_file, env) task = _make_task_or_dag_from_entrypoint_with_overrides( From e7aa4220971d44283c91f0d58c4590f6201f692c Mon Sep 17 00:00:00 2001 From: cblmemo Date: Mon, 8 Jan 2024 22:03:49 -0800 Subject: [PATCH 4/6] remove debug --- sky/serve/core.py | 19 +++++++++---------- sky/utils/dag_utils.py | 1 - 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/sky/serve/core.py b/sky/serve/core.py index 3164f5568a6..6dc70b55a81 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -159,16 +159,15 @@ def up( # whether the service is already running. If the id is the same # with the current job id, we know the service is up and running # for the first time; otherwise it is a name conflict. - with ux_utils.enable_traceback(): - controller_job_id, controller_handle = execution.execute( - entrypoint=controller_task, - stream_logs=False, - cluster_name=controller_name, - detach_run=True, - idle_minutes_to_autostop=constants. - CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP, - retry_until_up=True, - ) + controller_job_id, controller_handle = execution.execute( + entrypoint=controller_task, + stream_logs=False, + cluster_name=controller_name, + detach_run=True, + idle_minutes_to_autostop=constants. + CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP, + retry_until_up=True, + ) style = colorama.Style fore = colorama.Fore diff --git a/sky/utils/dag_utils.py b/sky/utils/dag_utils.py index c29d053c052..9cbfa39d16e 100644 --- a/sky/utils/dag_utils.py +++ b/sky/utils/dag_utils.py @@ -52,7 +52,6 @@ def load_chain_dag_from_yaml( # can listen to user feedback to change this. env_overrides = None services_overrides = None - print(1, services_overrides) current_task = None with dag_lib.Dag() as dag: From ce3df17525fe909d2a1a06afb220923489dca39d Mon Sep 17 00:00:00 2001 From: cblmemo Date: Mon, 8 Jan 2024 22:18:21 -0800 Subject: [PATCH 5/6] upd to new version of service spec. --- sky/cli.py | 100 ++++++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index c13ce1660b8..428db71de3a 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -713,12 +713,12 @@ def _parse_resources_override_params( def _parse_services_override_params( readiness_path: Optional[str] = None, initial_delay_seconds: Optional[int] = None, - post_data: Optional[str] = None, - auto_restart: Optional[bool] = None, min_replicas: Optional[int] = None, max_replicas: Optional[int] = None, - qps_upper_threshold: Optional[int] = None, - qps_lower_threshold: Optional[int] = None, + target_qps_per_replica: Optional[float] = None, + post_data: Optional[str] = None, + upscale_delay_seconds: Optional[int] = None, + downscale_delay_seconds: Optional[int] = None, ) -> Dict[str, Any]: """Parses the services override parameters into a dictionary.""" services_override_params: Dict[str, Any] = {} @@ -730,6 +730,13 @@ def _parse_services_override_params( if initial_delay_seconds is not None: services_override_params[ 'initial_delay_seconds'] = initial_delay_seconds + if min_replicas is not None: + services_override_params['min_replicas'] = min_replicas + if max_replicas is not None: + services_override_params['max_replicas'] = max_replicas + if target_qps_per_replica is not None: + services_override_params[ + 'target_qps_per_replica'] = target_qps_per_replica if post_data is not None: if post_data.lower() == 'none': services_override_params['post_data'] = None @@ -743,16 +750,12 @@ def _parse_services_override_params( '`readiness_probe` section of your service YAML.' ) from e services_override_params['post_data'] = post_data - if auto_restart is not None: - services_override_params['auto_restart'] = auto_restart - if min_replicas is not None: - services_override_params['min_replicas'] = min_replicas - if max_replicas is not None: - services_override_params['max_replicas'] = max_replicas - if qps_upper_threshold is not None: - services_override_params['qps_upper_threshold'] = qps_upper_threshold - if qps_lower_threshold is not None: - services_override_params['qps_lower_threshold'] = qps_lower_threshold + if upscale_delay_seconds is not None: + services_override_params[ + 'upscale_delay_seconds'] = upscale_delay_seconds + if downscale_delay_seconds is not None: + services_override_params[ + 'downscale_delay_seconds'] = downscale_delay_seconds return services_override_params @@ -1117,12 +1120,12 @@ def _make_task_or_dag_from_entrypoint_with_overrides( ports: Optional[Tuple[str]] = None, readiness_path: Optional[str] = None, initial_delay_seconds: Optional[int] = None, - post_data: Optional[str] = None, - auto_restart: Optional[bool] = None, min_replicas: Optional[int] = None, max_replicas: Optional[int] = None, - qps_upper_threshold: Optional[int] = None, - qps_lower_threshold: Optional[int] = None, + target_qps_per_replica: Optional[float] = None, + post_data: Optional[str] = None, + upscale_delay_seconds: Optional[int] = None, + downscale_delay_seconds: Optional[int] = None, env: Optional[List[Tuple[str, str]]] = None, # spot launch specific spot_recovery: Optional[str] = None, @@ -1173,12 +1176,12 @@ def _make_task_or_dag_from_entrypoint_with_overrides( services_override_params = _parse_services_override_params( readiness_path=readiness_path, initial_delay_seconds=initial_delay_seconds, - post_data=post_data, - auto_restart=auto_restart, min_replicas=min_replicas, max_replicas=max_replicas, - qps_upper_threshold=qps_upper_threshold, - qps_lower_threshold=qps_lower_threshold, + target_qps_per_replica=target_qps_per_replica, + post_data=post_data, + upscale_delay_seconds=upscale_delay_seconds, + downscale_delay_seconds=downscale_delay_seconds, ) if is_yaml: @@ -4353,16 +4356,6 @@ def serve(): required=False, help=('Initial delay in seconds. Any readiness probe failures ' 'during this period will be ignored.')) -@click.option('--post-data', - default=None, - type=str, - required=False, - help=('Post data for the readiness probe.')) -@click.option('--auto-restart', - default=None, - type=bool, - required=False, - help=('Whether to restart the service replica if it fails.')) @click.option('--min-replicas', default=None, type=int, @@ -4375,20 +4368,31 @@ def serve(): help=('Maximum number of replicas. If not specified, SkyServe ' 'will use fixed number of replicas same as min_replicas ' 'and ignore any specified QPS threshold.')) -@click.option('--qps-upper-threshold', +@click.option('--target-qps-per-replica', + default=None, + type=float, + required=False, + help=('Target number of queries per second per replica for ' + 'autoscaling. If not specified, SkyServe will use fixed ' + 'number of replicas same as min_replicas.')) +@click.option('--post-data', + default=None, + type=str, + required=False, + help=('Post data for the readiness probe. If not specified, ' + 'use GET request; otherwise, use POST request with ' + 'the argument as the post data. The argument should be ' + 'an JSON formatted string, like \'{"key": "value"}\'.')) +@click.option('--upscale-delay-seconds', default=None, type=int, required=False, - help=('QPS threshold for scaling up. If the QPS of your service' - 'exceeds this threshold, SkyServe will scale up your ' - 'service.')) -@click.option('--qps-lower-threshold', + help=('Upscale delay in seconds.')) +@click.option('--downscale-delay-seconds', default=None, type=int, required=False, - help=('QPS threshold for scaling down. If the QPS of your ' - 'service is lower than this threshold, SkyServe will ' - 'scale down your service.')) + help=('Downscale delay in seconds.')) @click.option('--yes', '-y', is_flag=True, @@ -4419,12 +4423,12 @@ def serve_up( disk_tier: Optional[str], readiness_path: Optional[str], initial_delay_seconds: Optional[int], - post_data: Optional[str], - auto_restart: Optional[bool], min_replicas: Optional[int], max_replicas: Optional[int], - qps_upper_threshold: Optional[int], - qps_lower_threshold: Optional[int], + target_qps_per_replica: Optional[float], + post_data: Optional[str], + upscale_delay_seconds: Optional[int], + downscale_delay_seconds: Optional[int], yes: bool, ): """Launch a SkyServe service. @@ -4490,12 +4494,12 @@ def serve_up( ports=ports, readiness_path=readiness_path, initial_delay_seconds=initial_delay_seconds, - post_data=post_data, - auto_restart=auto_restart, min_replicas=min_replicas, max_replicas=max_replicas, - qps_upper_threshold=qps_upper_threshold, - qps_lower_threshold=qps_lower_threshold, + target_qps_per_replica=target_qps_per_replica, + post_data=post_data, + upscale_delay_seconds=upscale_delay_seconds, + downscale_delay_seconds=downscale_delay_seconds, entrypoint_name='Service', ) if isinstance(task, sky.Dag): From b15f691a1346f2626b17fc4bff9a4eae3748d604 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Tue, 9 Jan 2024 16:49:05 -0800 Subject: [PATCH 6/6] wording & skip check service overridefor dag --- sky/cli.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 428db71de3a..6363e30ede5 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1201,12 +1201,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides( f'{resources_override_params} are ignored, ' 'since the yaml file contains multiple tasks.', fg='yellow') - if services_override_params: - click.secho( - 'WARNING: Services override params ' - f'{services_override_params} are ignored, ' - 'since the yaml file contains multiple tasks.', - fg='yellow') + # We don't need to check service override params, since + # sky serve does not support serving a chain dag. It will + # raise an error in `serve_up`. return dag assert len(dag.tasks) == 1, ( f'If you see this, please file an issue; tasks: {dag.tasks}') @@ -4367,7 +4364,7 @@ def serve(): required=False, help=('Maximum number of replicas. If not specified, SkyServe ' 'will use fixed number of replicas same as min_replicas ' - 'and ignore any specified QPS threshold.')) + 'and ignore autoscaling parameters like target QPS.')) @click.option('--target-qps-per-replica', default=None, type=float, @@ -4387,12 +4384,12 @@ def serve(): default=None, type=int, required=False, - help=('Upscale delay in seconds.')) + help=('Autoscaler upscale delay in seconds.')) @click.option('--downscale-delay-seconds', default=None, type=int, required=False, - help=('Downscale delay in seconds.')) + help=('Autoscaler downscale delay in seconds.')) @click.option('--yes', '-y', is_flag=True,