Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cblmemo committed Dec 13, 2023
1 parent 576f5c9 commit 01efbcf
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 35 deletions.
25 changes: 19 additions & 6 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4278,13 +4278,26 @@ def serve_up(
with ux_utils.print_exception_no_traceback():
raise ValueError('Service section not found in the YAML file. '
'To fix, add a valid `service` field.')
assert len(task.resources) == 1
requested_resources = list(task.resources)[0]
if requested_resources.ports is None or len(requested_resources.ports) != 1:
with ux_utils.print_exception_no_traceback():
service_port: Optional[int] = None
for requested_resources in list(task.resources):
if requested_resources.ports is None or len(
requested_resources.ports) != 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Must only specify one port in resources. Each replica '
'will use the port specified as application ingress port.')
service_port_str = requested_resources.ports[0]
if not service_port_str.isdigit():
raise ValueError(f'Port {service_port_str!r} is not a valid port '
'number. Please specify a single port instead. '
f'Got: {service_port_str!r}')
resource_port = int(service_port_str)
if service_port is None:
service_port = resource_port
if service_port != resource_port:
raise ValueError(
'Must only specify one port in resources. Each replica '
'will use the port specified as application ingress port.')
f'Got multiple ports: {service_port} and {resource_port} '
'in different resources. Please specify single port instead.')

click.secho('Service Spec:', fg='cyan')
click.echo(task.service)
Expand Down
42 changes: 32 additions & 10 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""SkyServe core APIs."""
import re
import tempfile
import typing
from typing import Any, Dict, List, Optional, Union

import colorama
Expand All @@ -24,6 +25,9 @@
from sky.utils import subprocess_utils
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import clouds


@usage_lib.entrypoint
def up(
Expand Down Expand Up @@ -56,13 +60,29 @@ def up(
with ux_utils.print_exception_no_traceback():
raise RuntimeError('Service section not found.')

assert len(task.resources) == 1, task
requested_resources = list(task.resources)[0]
if requested_resources.ports is None or len(requested_resources.ports) != 1:
with ux_utils.print_exception_no_traceback():
requested_cloud: Optional['clouds.Cloud'] = None
service_port: Optional[int] = None
for requested_resources in task.resources:
if requested_resources.ports is None or len(
requested_resources.ports) != 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Must only specify one port in resources. Each replica '
'will use the port specified as application ingress port.')
service_port_str = requested_resources.ports[0]
if not service_port_str.isdigit():
raise ValueError(f'Port {service_port_str!r} is not a valid port '
'number. Please specify a single port instead. '
f'Got: {service_port_str!r}')
resource_port = int(service_port_str)
if service_port is None:
service_port = resource_port
if service_port != resource_port:
raise ValueError(
'Must only specify one port in resources. Each replica '
'will use the port specified as application ingress port.')
f'Got multiple ports: {service_port} and {resource_port} '
'in different resources. Please specify single port instead.')
if requested_cloud is None:
requested_cloud = requested_resources.cloud

controller_utils.maybe_translate_local_file_mounts_and_sync_up(task,
path='serve')
Expand Down Expand Up @@ -102,9 +122,9 @@ def up(
controller_exist = (
global_user_state.get_cluster_from_name(controller_name)
is not None)
controller_cloud = (
requested_resources.cloud if not controller_exist and
controller_resources.cloud is None else controller_resources.cloud)
controller_cloud = (requested_cloud if not controller_exist and
controller_resources.cloud is None else
controller_resources.cloud)
# TODO(tian): Probably run another sky.launch after we get the load
# balancer port from the controller? So we don't need to open so many
# ports here. Or, we should have a nginx traffic control to refuse
Expand Down Expand Up @@ -329,7 +349,9 @@ def status(
'auto_restart': (bool) whether the service replica will be
auto-restarted,
'requested_resources': (sky.Resources) requested resources
for replica,
for replica (deprecated),
'requested_resources_str': (str) str representation of
requested resources,
'replica_info': (List[Dict[str, Any]]) replica information,
}
Expand Down
3 changes: 2 additions & 1 deletion sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None:
def _get_resources_ports(task_yaml: str) -> str:
"""Get the resources ports used by the task."""
task = sky.Task.from_yaml(task_yaml)
assert len(task.resources) == 1, task
# Already checked all ports are the same in sky.serve.core.up
assert len(task.resources) >= 1, task
task_resources = list(task.resources)[0]
# Already checked the resources have and only have one port
# before upload the task yaml.
Expand Down
14 changes: 9 additions & 5 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None:


_DB = db_utils.SQLiteConn(_DB_PATH, create_table)
db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services',
'requested_resources_str', 'TEXT')

_UNIQUE_CONSTRAINT_FAILED_ERROR_MSG = 'UNIQUE constraint failed: services.name'

Expand Down Expand Up @@ -178,7 +180,7 @@ def from_replica_statuses(


def add_service(name: str, controller_job_id: int, policy: str,
auto_restart: bool, requested_resources: 'sky.Resources',
auto_restart: bool, requested_resources_str: str,
status: ServiceStatus) -> bool:
"""Add a service in the database.
Expand All @@ -190,11 +192,11 @@ def add_service(name: str, controller_job_id: int, policy: str,
_DB.cursor.execute(
"""\
INSERT INTO services
(name, controller_job_id, status, policy,
auto_restart, requested_resources)
(name, controller_job_id, status, policy, auto_restart,
requested_resources, requested_resources_str)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, controller_job_id, status.value, policy, int(auto_restart),
pickle.dumps(requested_resources)))
None, requested_resources_str))
_DB.conn.commit()
except sqlite3.IntegrityError as e:
if str(e) != _UNIQUE_CONSTRAINT_FAILED_ERROR_MSG:
Expand Down Expand Up @@ -251,7 +253,8 @@ def set_service_load_balancer_port(service_name: str,

def _get_service_from_row(row) -> Dict[str, Any]:
(name, controller_job_id, controller_port, load_balancer_port, status,
uptime, policy, auto_restart, requested_resources) = row[:9]
uptime, policy, auto_restart, requested_resources,
requested_resources_str) = row[:10]
return {
'name': name,
'controller_job_id': controller_job_id,
Expand All @@ -263,6 +266,7 @@ def _get_service_from_row(row) -> Dict[str, Any]:
'auto_restart': bool(auto_restart),
'requested_resources': pickle.loads(requested_resources)
if requested_resources is not None else None,
'requested_resources_str': requested_resources_str,
}


Expand Down
9 changes: 7 additions & 2 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,12 @@ def format_service_table(service_records: List[Dict[str, Any]],
replicas = _get_replicas(record)
endpoint = get_endpoint(record)
policy = record['policy']
requested_resources = record['requested_resources']
# TODO(tian): Backward compatibility.
# Remove after 2 minor release, 0.6.0.
if record.get('requested_resources_str') is None:
requested_resources_str = str(record['requested_resources'])
else:
requested_resources_str = record['requested_resources_str']

service_values = [
service_name,
Expand All @@ -690,7 +695,7 @@ def format_service_table(service_records: List[Dict[str, Any]],
endpoint,
]
if show_all:
service_values.extend([policy, requested_resources])
service_values.extend([policy, requested_resources_str])
service_table.add_row(service_values)

replica_table = _format_replica_table(replica_infos, show_all)
Expand Down
16 changes: 5 additions & 11 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
from typing import Dict, List

import filelock
import yaml

from sky import authentication
from sky import exceptions
from sky import resources
from sky import serve
from sky import sky_logging
from sky import task as task_lib
from sky.backends import backend_utils
from sky.backends import cloud_vm_ray_backend
from sky.serve import constants
from sky.serve import controller
Expand Down Expand Up @@ -129,13 +127,9 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
authentication.get_or_generate_keys()

# Initialize database record for the service.
service_spec = serve.SkyServiceSpec.from_yaml(tmp_task_yaml)
with open(tmp_task_yaml, 'r') as f:
config = yaml.safe_load(f)
resources_config = None
if isinstance(config, dict):
resources_config = config.get('resources')
requested_resources = resources.Resources.from_yaml_config(resources_config)
task = task_lib.Task.from_yaml(tmp_task_yaml)
assert task.service is not None
service_spec = task.service
if len(serve_state.get_services()) >= serve_utils.NUM_SERVICE_THRESHOLD:
_cleanup_storage(tmp_task_yaml)
with ux_utils.print_exception_no_traceback():
Expand All @@ -145,7 +139,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
controller_job_id=job_id,
policy=service_spec.policy_str(),
auto_restart=service_spec.auto_restart,
requested_resources=requested_resources,
requested_resources_str=backend_utils.get_task_resources_str(task),
status=serve_state.ServiceStatus.CONTROLLER_INIT)
# Directly throw an error here. See sky/serve/api.py::up
# for more details.
Expand Down

0 comments on commit 01efbcf

Please sign in to comment.