diff --git a/sky/cli.py b/sky/cli.py index 216f880305b..b065d051176 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -4278,13 +4278,26 @@ def serve_up( with ux_utils.print_exception_no_traceback(): raise ValueError('Service section not found in the YAML file. ' 'To fix, add a valid `service` field.') - assert len(task.resources) == 1 - requested_resources = list(task.resources)[0] - if requested_resources.ports is None or len(requested_resources.ports) != 1: - with ux_utils.print_exception_no_traceback(): + service_port: Optional[int] = None + for requested_resources in list(task.resources): + if requested_resources.ports is None or len( + requested_resources.ports) != 1: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Must only specify one port in resources. Each replica ' + 'will use the port specified as application ingress port.') + service_port_str = requested_resources.ports[0] + if not service_port_str.isdigit(): + raise ValueError(f'Port {service_port_str!r} is not a valid port ' + 'number. Please specify a single port instead. ' + f'Got: {service_port_str!r}') + resource_port = int(service_port_str) + if service_port is None: + service_port = resource_port + if service_port != resource_port: raise ValueError( - 'Must only specify one port in resources. Each replica ' - 'will use the port specified as application ingress port.') + f'Got multiple ports: {service_port} and {resource_port} ' + 'in different resources. Please specify single port instead.') click.secho('Service Spec:', fg='cyan') click.echo(task.service) diff --git a/sky/serve/core.py b/sky/serve/core.py index e285492987f..bfc27cd735e 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -1,6 +1,7 @@ """SkyServe core APIs.""" import re import tempfile +import typing from typing import Any, Dict, List, Optional, Union import colorama @@ -24,6 +25,9 @@ from sky.utils import subprocess_utils from sky.utils import ux_utils +if typing.TYPE_CHECKING: + from sky import clouds + @usage_lib.entrypoint def up( @@ -56,13 +60,29 @@ def up( with ux_utils.print_exception_no_traceback(): raise RuntimeError('Service section not found.') - assert len(task.resources) == 1, task - requested_resources = list(task.resources)[0] - if requested_resources.ports is None or len(requested_resources.ports) != 1: - with ux_utils.print_exception_no_traceback(): + requested_cloud: Optional['clouds.Cloud'] = None + service_port: Optional[int] = None + for requested_resources in task.resources: + if requested_resources.ports is None or len( + requested_resources.ports) != 1: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Must only specify one port in resources. Each replica ' + 'will use the port specified as application ingress port.') + service_port_str = requested_resources.ports[0] + if not service_port_str.isdigit(): + raise ValueError(f'Port {service_port_str!r} is not a valid port ' + 'number. Please specify a single port instead. ' + f'Got: {service_port_str!r}') + resource_port = int(service_port_str) + if service_port is None: + service_port = resource_port + if service_port != resource_port: raise ValueError( - 'Must only specify one port in resources. Each replica ' - 'will use the port specified as application ingress port.') + f'Got multiple ports: {service_port} and {resource_port} ' + 'in different resources. Please specify single port instead.') + if requested_cloud is None: + requested_cloud = requested_resources.cloud controller_utils.maybe_translate_local_file_mounts_and_sync_up(task, path='serve') @@ -102,9 +122,9 @@ def up( controller_exist = ( global_user_state.get_cluster_from_name(controller_name) is not None) - controller_cloud = ( - requested_resources.cloud if not controller_exist and - controller_resources.cloud is None else controller_resources.cloud) + controller_cloud = (requested_cloud if not controller_exist and + controller_resources.cloud is None else + controller_resources.cloud) # TODO(tian): Probably run another sky.launch after we get the load # balancer port from the controller? So we don't need to open so many # ports here. Or, we should have a nginx traffic control to refuse @@ -329,7 +349,9 @@ def status( 'auto_restart': (bool) whether the service replica will be auto-restarted, 'requested_resources': (sky.Resources) requested resources - for replica, + for replica (deprecated), + 'requested_resources_str': (str) str representation of + requested resources, 'replica_info': (List[Dict[str, Any]]) replica information, } diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index fc11b0d3110..a20f9951c21 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -149,7 +149,8 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None: def _get_resources_ports(task_yaml: str) -> str: """Get the resources ports used by the task.""" task = sky.Task.from_yaml(task_yaml) - assert len(task.resources) == 1, task + # Already checked all ports are the same in sky.serve.core.up + assert len(task.resources) >= 1, task task_resources = list(task.resources)[0] # Already checked the resources have and only have one port # before upload the task yaml. diff --git a/sky/serve/serve_state.py b/sky/serve/serve_state.py index f33351e02e0..d029147e7a4 100644 --- a/sky/serve/serve_state.py +++ b/sky/serve/serve_state.py @@ -47,6 +47,8 @@ def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None: _DB = db_utils.SQLiteConn(_DB_PATH, create_table) +db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services', + 'requested_resources_str', 'TEXT') _UNIQUE_CONSTRAINT_FAILED_ERROR_MSG = 'UNIQUE constraint failed: services.name' @@ -178,7 +180,7 @@ def from_replica_statuses( def add_service(name: str, controller_job_id: int, policy: str, - auto_restart: bool, requested_resources: 'sky.Resources', + auto_restart: bool, requested_resources_str: str, status: ServiceStatus) -> bool: """Add a service in the database. @@ -190,11 +192,11 @@ def add_service(name: str, controller_job_id: int, policy: str, _DB.cursor.execute( """\ INSERT INTO services - (name, controller_job_id, status, policy, - auto_restart, requested_resources) + (name, controller_job_id, status, policy, auto_restart, + requested_resources, requested_resources_str) VALUES (?, ?, ?, ?, ?, ?)""", (name, controller_job_id, status.value, policy, int(auto_restart), - pickle.dumps(requested_resources))) + None, requested_resources_str)) _DB.conn.commit() except sqlite3.IntegrityError as e: if str(e) != _UNIQUE_CONSTRAINT_FAILED_ERROR_MSG: @@ -251,7 +253,8 @@ def set_service_load_balancer_port(service_name: str, def _get_service_from_row(row) -> Dict[str, Any]: (name, controller_job_id, controller_port, load_balancer_port, status, - uptime, policy, auto_restart, requested_resources) = row[:9] + uptime, policy, auto_restart, requested_resources, + requested_resources_str) = row[:10] return { 'name': name, 'controller_job_id': controller_job_id, @@ -263,6 +266,7 @@ def _get_service_from_row(row) -> Dict[str, Any]: 'auto_restart': bool(auto_restart), 'requested_resources': pickle.loads(requested_resources) if requested_resources is not None else None, + 'requested_resources_str': requested_resources_str, } diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 3f9e8a0888c..331d38f3cae 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -680,7 +680,12 @@ def format_service_table(service_records: List[Dict[str, Any]], replicas = _get_replicas(record) endpoint = get_endpoint(record) policy = record['policy'] - requested_resources = record['requested_resources'] + # TODO(tian): Backward compatibility. + # Remove after 2 minor release, 0.6.0. + if record.get('requested_resources_str') is None: + requested_resources_str = str(record['requested_resources']) + else: + requested_resources_str = record['requested_resources_str'] service_values = [ service_name, @@ -690,7 +695,7 @@ def format_service_table(service_records: List[Dict[str, Any]], endpoint, ] if show_all: - service_values.extend([policy, requested_resources]) + service_values.extend([policy, requested_resources_str]) service_table.add_row(service_values) replica_table = _format_replica_table(replica_infos, show_all) diff --git a/sky/serve/service.py b/sky/serve/service.py index 1b2aaf253c0..9c59a66e454 100644 --- a/sky/serve/service.py +++ b/sky/serve/service.py @@ -12,14 +12,12 @@ from typing import Dict, List import filelock -import yaml from sky import authentication from sky import exceptions -from sky import resources -from sky import serve from sky import sky_logging from sky import task as task_lib +from sky.backends import backend_utils from sky.backends import cloud_vm_ray_backend from sky.serve import constants from sky.serve import controller @@ -129,13 +127,9 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): authentication.get_or_generate_keys() # Initialize database record for the service. - service_spec = serve.SkyServiceSpec.from_yaml(tmp_task_yaml) - with open(tmp_task_yaml, 'r') as f: - config = yaml.safe_load(f) - resources_config = None - if isinstance(config, dict): - resources_config = config.get('resources') - requested_resources = resources.Resources.from_yaml_config(resources_config) + task = task_lib.Task.from_yaml(tmp_task_yaml) + assert task.service is not None + service_spec = task.service if len(serve_state.get_services()) >= serve_utils.NUM_SERVICE_THRESHOLD: _cleanup_storage(tmp_task_yaml) with ux_utils.print_exception_no_traceback(): @@ -145,7 +139,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): controller_job_id=job_id, policy=service_spec.policy_str(), auto_restart=service_spec.auto_restart, - requested_resources=requested_resources, + requested_resources_str=backend_utils.get_task_resources_str(task), status=serve_state.ServiceStatus.CONTROLLER_INIT) # Directly throw an error here. See sky/serve/api.py::up # for more details.