Skip to content

Commit

Permalink
delete storage buckets of older versions
Browse files Browse the repository at this point in the history
  • Loading branch information
MaoZiming committed Jan 3, 2024
1 parent 282c942 commit cbae61d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
21 changes: 21 additions & 0 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from sky.serve import constants as serve_constants
from sky.serve import serve_state
from sky.serve import serve_utils
from sky.serve import service
from sky.skylet import constants
from sky.skylet import job_lib
from sky.usage import usage_lib
Expand Down Expand Up @@ -513,9 +514,13 @@ def __init__(self, service_name: str, spec: 'service_spec.SkyServiceSpec',
threading.Thread(target=self._replica_prober).start()

self.latest_version: int = initial_version
self.oldest_version: int = initial_version
self.version2spec: serve_utils.ThreadSafeDict[
int, 'service_spec.SkyServiceSpec'] = serve_utils.ThreadSafeDict()
self.version2yaml: serve_utils.ThreadSafeDict[
int, str] = serve_utils.ThreadSafeDict()
self.version2spec[initial_version] = spec
self.version2yaml[initial_version] = task_yaml_path
self.mixed_replica_versions: bool = False

################################
Expand Down Expand Up @@ -979,6 +984,21 @@ def _replica_prober(self) -> None:
]
serve_utils.set_service_status_from_replica_statuses(
self._service_name, replica_statuses)

# Clean old version
oldest_version = min([
info.version for info in serve_state.get_replica_infos(
self._service_name)
])
if self.oldest_version < oldest_version:
for version in range(self.oldest_version, oldest_version):
# Delete old version metadata
# Delete storage buckets of older versions.
del self.version2spec[version]
service.cleanup_storage(self.version2yaml[version])
del self.version2yaml[version]
self.oldest_version = oldest_version

except Exception as e: # pylint: disable=broad-except
# No matter what error happens, we should keep the
# replica prober running.
Expand All @@ -996,6 +1016,7 @@ def update_version(self, version: int, spec: 'service_spec.SkyServiceSpec',
mixed_replica_versions: bool,
task_yaml_path: str) -> None:
self.version2spec[version] = spec
self.version2yaml[version] = task_yaml_path
self.latest_version = version
self.mixed_replica_versions = mixed_replica_versions
self._task_yaml_path = task_yaml_path
Expand Down
8 changes: 4 additions & 4 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _handle_signal(service_name: str) -> None:
raise error_type(f'User signal received: {user_signal.value}')


def _cleanup_storage(task_yaml: str) -> bool:
def cleanup_storage(task_yaml: str) -> bool:
"""Clean up the storage for the service.
Args:
Expand Down Expand Up @@ -114,7 +114,7 @@ def _cleanup(service_name: str, task_yaml: str) -> bool:
info)
failed = True
logger.error(f'Replica {info.replica_id} failed to terminate.')
success = _cleanup_storage(task_yaml)
success = cleanup_storage(task_yaml)
if not success:
failed = True
return failed
Expand All @@ -132,7 +132,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
assert task.service is not None, task
service_spec = task.service
if len(serve_state.get_services()) >= serve_utils.NUM_SERVICE_THRESHOLD:
_cleanup_storage(tmp_task_yaml)
cleanup_storage(tmp_task_yaml)
with ux_utils.print_exception_no_traceback():
raise RuntimeError('Max number of services reached.')
success = serve_state.add_service(
Expand All @@ -145,7 +145,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
# Directly throw an error here. See sky/serve/api.py::up
# for more details.
if not success:
_cleanup_storage(tmp_task_yaml)
cleanup_storage(tmp_task_yaml)
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Service {service_name} already exists.')

Expand Down

0 comments on commit cbae61d

Please sign in to comment.