From cbae61d8ff21210fed713c905cb54f54d0814300 Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Wed, 3 Jan 2024 11:26:14 -0500 Subject: [PATCH] delete storage buckets of older versions --- sky/serve/replica_managers.py | 21 +++++++++++++++++++++ sky/serve/service.py | 8 ++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index f56207d05cb..436ad15ff7f 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -25,6 +25,7 @@ from sky.serve import constants as serve_constants from sky.serve import serve_state from sky.serve import serve_utils +from sky.serve import service from sky.skylet import constants from sky.skylet import job_lib from sky.usage import usage_lib @@ -513,9 +514,13 @@ def __init__(self, service_name: str, spec: 'service_spec.SkyServiceSpec', threading.Thread(target=self._replica_prober).start() self.latest_version: int = initial_version + self.oldest_version: int = initial_version self.version2spec: serve_utils.ThreadSafeDict[ int, 'service_spec.SkyServiceSpec'] = serve_utils.ThreadSafeDict() + self.version2yaml: serve_utils.ThreadSafeDict[ + int, str] = serve_utils.ThreadSafeDict() self.version2spec[initial_version] = spec + self.version2yaml[initial_version] = task_yaml_path self.mixed_replica_versions: bool = False ################################ @@ -979,6 +984,21 @@ def _replica_prober(self) -> None: ] serve_utils.set_service_status_from_replica_statuses( self._service_name, replica_statuses) + + # Clean old version + oldest_version = min([ + info.version for info in serve_state.get_replica_infos( + self._service_name) + ]) + if self.oldest_version < oldest_version: + for version in range(self.oldest_version, oldest_version): + # Delete old version metadata + # Delete storage buckets of older versions. + del self.version2spec[version] + service.cleanup_storage(self.version2yaml[version]) + del self.version2yaml[version] + self.oldest_version = oldest_version + except Exception as e: # pylint: disable=broad-except # No matter what error happens, we should keep the # replica prober running. @@ -996,6 +1016,7 @@ def update_version(self, version: int, spec: 'service_spec.SkyServiceSpec', mixed_replica_versions: bool, task_yaml_path: str) -> None: self.version2spec[version] = spec + self.version2yaml[version] = task_yaml_path self.latest_version = version self.mixed_replica_versions = mixed_replica_versions self._task_yaml_path = task_yaml_path diff --git a/sky/serve/service.py b/sky/serve/service.py index 87f5428741d..9c0bc30d2a0 100644 --- a/sky/serve/service.py +++ b/sky/serve/service.py @@ -61,7 +61,7 @@ def _handle_signal(service_name: str) -> None: raise error_type(f'User signal received: {user_signal.value}') -def _cleanup_storage(task_yaml: str) -> bool: +def cleanup_storage(task_yaml: str) -> bool: """Clean up the storage for the service. Args: @@ -114,7 +114,7 @@ def _cleanup(service_name: str, task_yaml: str) -> bool: info) failed = True logger.error(f'Replica {info.replica_id} failed to terminate.') - success = _cleanup_storage(task_yaml) + success = cleanup_storage(task_yaml) if not success: failed = True return failed @@ -132,7 +132,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): assert task.service is not None, task service_spec = task.service if len(serve_state.get_services()) >= serve_utils.NUM_SERVICE_THRESHOLD: - _cleanup_storage(tmp_task_yaml) + cleanup_storage(tmp_task_yaml) with ux_utils.print_exception_no_traceback(): raise RuntimeError('Max number of services reached.') success = serve_state.add_service( @@ -145,7 +145,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): # Directly throw an error here. See sky/serve/api.py::up # for more details. if not success: - _cleanup_storage(tmp_task_yaml) + cleanup_storage(tmp_task_yaml) with ux_utils.print_exception_no_traceback(): raise ValueError(f'Service {service_name} already exists.')