Skip to content

Commit

Permalink
[Serve] Fix adding initial version and cleanup service versions (#3804)
Browse files Browse the repository at this point in the history
* fix

* add comments
  • Loading branch information
cblmemo authored Aug 7, 2024
1 parent f921f13 commit 51f1f78
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
2 changes: 0 additions & 2 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ def __init__(self, service_name: str,
self.latest_version: int = serve_constants.INITIAL_VERSION
# Oldest version among the currently provisioned and launched replicas
self.least_recent_version: int = serve_constants.INITIAL_VERSION
serve_state.add_or_update_version(self._service_name,
self.latest_version, spec)

def scale_up(self,
resources_override: Optional[Dict[str, Any]] = None) -> None:
Expand Down
9 changes: 9 additions & 0 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,12 @@ def delete_version(service_name: str, version: int) -> None:
DELETE FROM version_specs
WHERE service_name=(?)
AND version=(?)""", (service_name, version))


def delete_all_versions(service_name: str) -> None:
"""Deletes all versions from the database."""
with db_utils.safe_cursor(_DB_PATH) as cursor:
cursor.execute(
"""\
DELETE FROM version_specs
WHERE service_name=(?)""", (service_name,))
22 changes: 16 additions & 6 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def load_version_string(payload: str) -> str:

def _terminate_failed_services(
service_name: str,
service_status: serve_state.ServiceStatus) -> Optional[str]:
service_status: Optional[serve_state.ServiceStatus]) -> Optional[str]:
"""Terminate service in failed status.
Services included in ServiceStatus.failed_statuses() do not have an
Expand All @@ -397,6 +397,7 @@ def _terminate_failed_services(
generate_remote_service_dir_name(service_name))
shutil.rmtree(service_dir)
serve_state.remove_service(service_name)
serve_state.delete_all_versions(service_name)

if not remaining_replica_clusters:
return None
Expand All @@ -414,21 +415,30 @@ def terminate_services(service_names: Optional[List[str]], purge: bool) -> str:
for service_name in service_names:
service_status = _get_service_status(service_name,
with_replica_info=False)
assert service_status is not None, service_name
if service_status['status'] == serve_state.ServiceStatus.SHUTTING_DOWN:
if (service_status is not None and service_status['status']
== serve_state.ServiceStatus.SHUTTING_DOWN):
# Already scheduled to be terminated.
continue
if (service_status['status']
# If the `services` and `version_specs` table are not aligned, it might
# result in a None service status. In this case, the controller process
# is not functioning as well and we should also use the
# `_terminate_failed_services` function to clean up the service.
# This is a safeguard for a rare case, that is accidentally abort
# between `serve_state.add_service` and
# `serve_state.add_or_update_version` in service.py.
if (service_status is None or service_status['status']
in serve_state.ServiceStatus.failed_statuses()):
failed_status = (service_status['status']
if service_status is not None else None)
if purge:
message = _terminate_failed_services(service_name,
service_status['status'])
failed_status)
if message is not None:
messages.append(message)
else:
messages.append(
f'{colorama.Fore.YELLOW}Service {service_name!r} is in '
f'failed status ({service_status["status"]}). Skipping '
f'failed status ({failed_status}). Skipping '
'its termination as it could lead to a resource leak. '
f'(Use `sky serve down {service_name} --purge` to '
'forcefully terminate the service.)'
Expand Down
5 changes: 5 additions & 0 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Service {service_name} already exists.')

# Add initial version information to the service state.
serve_state.add_or_update_version(service_name, constants.INITIAL_VERSION,
service_spec)

# Create the service working directory.
service_dir = os.path.expanduser(
serve_utils.generate_remote_service_dir_name(service_name))
Expand Down Expand Up @@ -234,6 +238,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
else:
shutil.rmtree(service_dir)
serve_state.remove_service(service_name)
serve_state.delete_all_versions(service_name)
logger.info(f'Service {service_name} terminated successfully.')


Expand Down

0 comments on commit 51f1f78

Please sign in to comment.