From 51f1f78d8b45beaad2f89a9fb0fca2ca03350621 Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Wed, 7 Aug 2024 12:49:01 +0800 Subject: [PATCH] [Serve] Fix adding initial version and cleanup service versions (#3804) * fix * add comments --- sky/serve/replica_managers.py | 2 -- sky/serve/serve_state.py | 9 +++++++++ sky/serve/serve_utils.py | 22 ++++++++++++++++------ sky/serve/service.py | 5 +++++ 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index b25921f5610..81cc13c8abd 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -579,8 +579,6 @@ def __init__(self, service_name: str, self.latest_version: int = serve_constants.INITIAL_VERSION # Oldest version among the currently provisioned and launched replicas self.least_recent_version: int = serve_constants.INITIAL_VERSION - serve_state.add_or_update_version(self._service_name, - self.latest_version, spec) def scale_up(self, resources_override: Optional[Dict[str, Any]] = None) -> None: diff --git a/sky/serve/serve_state.py b/sky/serve/serve_state.py index df2b3f58345..7ddf22ccb81 100644 --- a/sky/serve/serve_state.py +++ b/sky/serve/serve_state.py @@ -525,3 +525,12 @@ def delete_version(service_name: str, version: int) -> None: DELETE FROM version_specs WHERE service_name=(?) AND version=(?)""", (service_name, version)) + + +def delete_all_versions(service_name: str) -> None: + """Deletes all versions from the database.""" + with db_utils.safe_cursor(_DB_PATH) as cursor: + cursor.execute( + """\ + DELETE FROM version_specs + WHERE service_name=(?)""", (service_name,)) diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index dc362aa7153..1340ef4acd8 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -371,7 +371,7 @@ def load_version_string(payload: str) -> str: def _terminate_failed_services( service_name: str, - service_status: serve_state.ServiceStatus) -> Optional[str]: + service_status: Optional[serve_state.ServiceStatus]) -> Optional[str]: """Terminate service in failed status. Services included in ServiceStatus.failed_statuses() do not have an @@ -397,6 +397,7 @@ def _terminate_failed_services( generate_remote_service_dir_name(service_name)) shutil.rmtree(service_dir) serve_state.remove_service(service_name) + serve_state.delete_all_versions(service_name) if not remaining_replica_clusters: return None @@ -414,21 +415,30 @@ def terminate_services(service_names: Optional[List[str]], purge: bool) -> str: for service_name in service_names: service_status = _get_service_status(service_name, with_replica_info=False) - assert service_status is not None, service_name - if service_status['status'] == serve_state.ServiceStatus.SHUTTING_DOWN: + if (service_status is not None and service_status['status'] + == serve_state.ServiceStatus.SHUTTING_DOWN): # Already scheduled to be terminated. continue - if (service_status['status'] + # If the `services` and `version_specs` table are not aligned, it might + # result in a None service status. In this case, the controller process + # is not functioning as well and we should also use the + # `_terminate_failed_services` function to clean up the service. + # This is a safeguard for a rare case, that is accidentally abort + # between `serve_state.add_service` and + # `serve_state.add_or_update_version` in service.py. + if (service_status is None or service_status['status'] in serve_state.ServiceStatus.failed_statuses()): + failed_status = (service_status['status'] + if service_status is not None else None) if purge: message = _terminate_failed_services(service_name, - service_status['status']) + failed_status) if message is not None: messages.append(message) else: messages.append( f'{colorama.Fore.YELLOW}Service {service_name!r} is in ' - f'failed status ({service_status["status"]}). Skipping ' + f'failed status ({failed_status}). Skipping ' 'its termination as it could lead to a resource leak. ' f'(Use `sky serve down {service_name} --purge` to ' 'forcefully terminate the service.)' diff --git a/sky/serve/service.py b/sky/serve/service.py index b2d43b7f3fb..4f4d9eccd91 100644 --- a/sky/serve/service.py +++ b/sky/serve/service.py @@ -156,6 +156,10 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): with ux_utils.print_exception_no_traceback(): raise ValueError(f'Service {service_name} already exists.') + # Add initial version information to the service state. + serve_state.add_or_update_version(service_name, constants.INITIAL_VERSION, + service_spec) + # Create the service working directory. service_dir = os.path.expanduser( serve_utils.generate_remote_service_dir_name(service_name)) @@ -234,6 +238,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): else: shutil.rmtree(service_dir) serve_state.remove_service(service_name) + serve_state.delete_all_versions(service_name) logger.info(f'Service {service_name} terminated successfully.')