diff --git a/sky/serve/autoscalers.py b/sky/serve/autoscalers.py index d8890ee8ce6..ae1219e0886 100644 --- a/sky/serve/autoscalers.py +++ b/sky/serve/autoscalers.py @@ -5,7 +5,7 @@ import math import time import typing -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Type, Union from sky import sky_logging from sky.serve import constants @@ -58,6 +58,9 @@ def __repr__(self) -> str: class Autoscaler: """Abstract class for autoscalers.""" + NAME: Optional[str] = None + REGISTRY: Dict[str, Type['Autoscaler']] = dict() + def __init__(self, spec: 'service_spec.SkyServiceSpec') -> None: """Initialize the autoscaler. @@ -83,6 +86,21 @@ def evaluate_scaling( """Evaluate autoscale options based on replica information.""" raise NotImplementedError + def __init_subclass__(cls) -> None: + if cls.NAME is None: + # This is an abstract class, don't put it in the registry. + return + assert cls.NAME not in cls.REGISTRY, f'Name {cls.NAME} already exists' + cls.REGISTRY[cls.NAME] = cls + + @classmethod + def get_autoscaler_names(cls) -> List[str]: + return list(cls.REGISTRY.keys()) + + @classmethod + def from_spec(cls, spec: 'service_spec.SkyServiceSpec') -> 'Autoscaler': + assert (spec.autoscaler is not None and spec.autoscaler in cls.REGISTRY) + return cls.REGISTRY[spec.autoscaler](spec) class RequestRateAutoscaler(Autoscaler): """RequestRateAutoscaler: Autoscale according to request rate. @@ -90,14 +108,13 @@ class RequestRateAutoscaler(Autoscaler): Scales when the number of requests in the given interval is above or below the threshold. """ + NAME: Optional[str] = 'RequestRateAutoscaler' - def __init__(self, spec: 'service_spec.SkyServiceSpec', - qps_window_size: int) -> None: + def __init__(self, spec: 'service_spec.SkyServiceSpec') -> None: """Initialize the request rate autoscaler. Variables: target_qps_per_replica: Target qps per replica for autoscaling. - qps_window_size: Window size for qps calculating. request_timestamps: All request timestamps within the window. upscale_counter: counter for upscale number of replicas. downscale_counter: counter for downscale number of replicas. @@ -107,7 +124,6 @@ def __init__(self, spec: 'service_spec.SkyServiceSpec', super().__init__(spec) self.target_qps_per_replica: Optional[ float] = spec.target_qps_per_replica - self.qps_window_size: int = qps_window_size self.request_timestamps: List[float] = [] self.upscale_counter: int = 0 self.downscale_counter: int = 0 @@ -136,7 +152,7 @@ def collect_request_information( request_aggregator_info.get('timestamps', [])) current_time = time.time() index = bisect.bisect_left(self.request_timestamps, - current_time - self.qps_window_size) + current_time - constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS) self.request_timestamps = self.request_timestamps[index:] def _get_desired_num_replicas(self) -> int: @@ -148,7 +164,7 @@ def _get_desired_num_replicas(self) -> int: # Convert to requests per second. num_requests_per_second = len( - self.request_timestamps) / self.qps_window_size + self.request_timestamps) / constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS target_num_replicas = math.ceil(num_requests_per_second / self.target_qps_per_replica) target_num_replicas = max(self.min_replicas, @@ -242,17 +258,17 @@ def _get_replica_ids_to_scale_down(num_limit: int) -> List[int]: return scaling_options -class HeteroGPUAutoscaler(Autoscaler): +class HeteroAccelAutoscaler(Autoscaler): """RequestRateAutoscaler: Autoscale according to request rate. Scales when the number of requests in the given interval is above or below the threshold. """ + NAME: Optional[str] = 'HeteroAccelAutoscaler' SCALE_UP_COOL_DOWN_INTERVAL_SECONDS = 300 - def __init__(self, spec: 'service_spec.SkyServiceSpec', - frequency: int, rps_window_size: int) -> None: + def __init__(self, spec: 'service_spec.SkyServiceSpec',) -> None: """Initialize the request rate autoscaler. Variables: @@ -266,16 +282,14 @@ def __init__(self, spec: 'service_spec.SkyServiceSpec', """ super().__init__(spec) self.rps_window_size: int = self.SCALE_UP_COOL_DOWN_INTERVAL_SECONDS - self.frequency = frequency + self.frequency = constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS self.last_scale_operation: float = 0. - self.request_timestamps: List[float] = [] self.request_timestamps_distribution: List[List[float]] = [[], [], [], [], [], [], []] self.request_distribution: List[int] = [0, 0, 0, 0, 0, 0, 0] self.request_rate_dist: List[float] = [0, 0, 0, 0, 0, 0, 0] self.total_request_in_window: int = 0 - self.last_scale_time: float = 0. self.scale_down_candidates: List['replica_managers.ReplicaInfo'] = [] def collect_request_information( @@ -441,11 +455,11 @@ def evaluate_scaling( List[AutoscalerDecision]]]) = [] # Return if the cool down interval has not passed. - if (time.time()- self.last_scale_time < + if (time.time()- self.last_scale_operation < self.SCALE_UP_COOL_DOWN_INTERVAL_SECONDS): return scaling_decisions - self.last_scale_time = time.time() + self.last_scale_operation = time.time() launched_replica_infos = [ info for info in replica_infos if info.is_launched ] diff --git a/sky/serve/constants.py b/sky/serve/constants.py index 73e2e21c2ca..7e35089b7fd 100644 --- a/sky/serve/constants.py +++ b/sky/serve/constants.py @@ -49,6 +49,9 @@ # TODO(tian): We might need to be careful that service logs can take a lot of # disk space. Maybe we could use a larger disk size, migrate to cloud storage or # do some log rotation. +# Default autoscaler +DEFAULT_AUTOSCALER = 'RequestRateAutoscaler' + CONTROLLER_RESOURCES = {'cpus': '4+', 'disk_size': 200} # A default controller with 4 vCPU and 16 GB memory can run up to 16 services. diff --git a/sky/serve/controller.py b/sky/serve/controller.py index 77bc672041e..59c014f74f3 100644 --- a/sky/serve/controller.py +++ b/sky/serve/controller.py @@ -6,7 +6,7 @@ import threading import time import traceback -from typing import List +from typing import Any, Dict, List import fastapi import uvicorn @@ -47,10 +47,7 @@ def __init__(self, service_name: str, service_spec: serve.SkyServiceSpec, spec=service_spec, task_yaml_path=task_yaml)) self._autoscaler: autoscalers.Autoscaler = ( - autoscalers.HeteroGPUAutoscaler( - service_spec, - frequency=constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS, - rps_window_size=constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS)) + autoscalers.Autoscaler.from_spec(service_spec)) self._port = port self._app = fastapi.FastAPI() @@ -92,8 +89,8 @@ def _run_autoscaler(self): logger.error(f' Traceback: {traceback.format_exc()}') time.sleep(constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS) - def _run_heteroGPU_autoscaler(self): - logger.info('Starting autoscaler.') + def _run_HeteroAccel_autoscaler(self): + logger.info('Starting HeteroAccel Autoscaler.') while True: try: # If the primary replica successfully launched to ReplicaStatus.READY, @@ -170,7 +167,8 @@ def run(self) -> None: @self._app.post('/controller/load_balancer_sync') async def load_balancer_sync(request: fastapi.Request): request_data = await request.json() - request_aggregator = request_data.get('request_aggregator') + request_aggregator: Dict[str, Any] = request_data.get( + 'request_aggregator', {}) logger.info( f'Received inflight request information: {request_aggregator}') self._autoscaler.collect_request_information(request_aggregator) @@ -185,8 +183,11 @@ def configure_logger(): for handler in uvicorn_access_logger.handlers: handler.setFormatter(sky_logging.FORMATTER) - #threading.Thread(target=self._run_autoscaler).start() - threading.Thread(target=self._run_heteroGPU_autoscaler).start() + if (isinstance(self._autoscaler, + autoscalers.HeteroAccelAutoscaler)): + threading.Thread(target=self._run_HeteroAccel_autoscaler).start() + else: + threading.Thread(target=self._run_autoscaler).start() logger.info('SkyServe Controller started on ' f'http://localhost:{self._port}') diff --git a/sky/serve/load_balancer.py b/sky/serve/load_balancer.py index 632ac530f37..4583380dfbd 100644 --- a/sky/serve/load_balancer.py +++ b/sky/serve/load_balancer.py @@ -111,7 +111,7 @@ def configure_logger(): uvicorn.run(self._app, host='0.0.0.0', port=self._load_balancer_port) -class HeteroGPULoadBalancer(SkyServeLoadBalancer): +class HeteroAccelLoadBalancer(SkyServeLoadBalancer): """SkyServeLoadBalancer: redirect incoming traffic. This class accept any traffic to the controller and redirect it @@ -133,7 +133,7 @@ def __init__(self, controller_url: str, load_balancer_port: int) -> None: self._load_balancing_policy: lb_policies.LoadBalancingPolicy = ( lb_policies.RoundRobinPolicy()) self._request_aggregator: serve_utils.RequestsAggregator = ( - serve_utils.RequestHeteroGPU()) + serve_utils.RequestHeteroAccel()) async def _redirect_handler(self, request: fastapi.Request): body_bytes = await request.body() @@ -164,7 +164,7 @@ async def _redirect_handler(self, request: fastapi.Request): def run_load_balancer(controller_addr: str, load_balancer_port: int): #load_balancer = SkyServeLoadBalancer(controller_url=controller_addr, # load_balancer_port=load_balancer_port) - load_balancer = HeteroGPULoadBalancer(controller_url=controller_addr, + load_balancer = HeteroAccelLoadBalancer(controller_url=controller_addr, load_balancer_port=load_balancer_port) logger.info(f'run_load_balancer(load_balancer_port): {load_balancer_port}') load_balancer.run() diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index aaa0912a65e..3db9a3a223a 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -622,7 +622,7 @@ def _launch_replica( self._service_name, replica_id) log_file_name = serve_utils.generate_replica_launch_log_file_name( self._service_name, replica_id) - ### Testing for heteroGPU + ### Testing for HeteroAccel max_retry = 30 p = multiprocessing.Process( target=ux_utils.RedirectOutputForProcess( diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 6e87dbd0fdb..4d3d52a0ee8 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -170,10 +170,10 @@ def __repr__(self) -> str: return f'RequestTimestamp(timestamps={self.timestamps})' -class RequestHeteroGPU(RequestsAggregator): +class RequestHeteroAccel(RequestsAggregator): """RequestTimestamp: Aggregates request timestamps. - This is useful for HeteroGPU-autoscaling. + This is useful for HeteroAccel-autoscaling. """ def __init__(self) -> None: diff --git a/sky/serve/service.py b/sky/serve/service.py index f6bdee04696..9b727cbb32d 100644 --- a/sky/serve/service.py +++ b/sky/serve/service.py @@ -165,6 +165,10 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): # Generate load balancer log file name. load_balancer_log_file = os.path.expanduser( serve_utils.generate_remote_load_balancer_log_file_name(service_name)) + + # Generate controller log file name. + controller_log_file = os.path.expanduser( + serve_utils.generate_remote_controller_log_file_name(service_name)) controller_process = None load_balancer_process = None @@ -177,8 +181,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int): #controller_process = multiprocessing.Process( # target=controller.run_controller, # args=(service_name, service_spec, task_yaml, controller_port)) - controller_log_file = os.path.expanduser( - '~/.sky/serve/test_serve/controller.log') + controller_process = multiprocessing.Process( target=ux_utils.RedirectOutputForProcess( controller.run_controller, controller_log_file).run, diff --git a/sky/serve/service_spec.py b/sky/serve/service_spec.py index d73477c4b95..91d4e6fd73a 100644 --- a/sky/serve/service_spec.py +++ b/sky/serve/service_spec.py @@ -6,6 +6,7 @@ import yaml +from sky.serve import autoscalers from sky.serve import constants from sky.utils import common_utils from sky.utils import schemas @@ -25,6 +26,7 @@ def __init__( post_data: Optional[Dict[str, Any]] = None, upscale_delay_seconds: Optional[int] = None, downscale_delay_seconds: Optional[int] = None, + autoscaler: Optional[str] = None, # The following arguments are deprecated. # TODO(ziming): remove this after 2 minor release, i.e. 0.6.0. # Deprecated: Always be True @@ -60,12 +62,20 @@ def __init__( 'Currently, SkyServe will cleanup failed replicas' 'and auto restart it to keep the service running.') + if autoscaler is None: + autoscaler = constants.DEFAULT_AUTOSCALER + + if autoscaler not in autoscalers.Autoscaler.get_autoscaler_names(): + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Unsupported autoscaler: {autoscaler}.') + self._readiness_path = readiness_path self._initial_delay_seconds = initial_delay_seconds self._min_replicas = min_replicas self._max_replicas = max_replicas self._target_qps_per_replica = target_qps_per_replica self._post_data = post_data + self._autoscaler: str = autoscaler self._upscale_delay_seconds = ( upscale_delay_seconds if upscale_delay_seconds is not None else @@ -138,6 +148,8 @@ def from_yaml_config(config: Dict[str, Any]) -> 'SkyServiceSpec': 'upscale_delay_seconds', None) service_config['downscale_delay_seconds'] = policy_section.get( 'downscale_delay_seconds', None) + service_config['autoscaler'] = policy_section.get( + 'autoscaler', None) return SkyServiceSpec(**service_config) @@ -183,6 +195,7 @@ def add_if_not_none(section, key, value, no_empty: bool = False): add_if_not_none('replica_policy', 'max_replicas', self.max_replicas) add_if_not_none('replica_policy', 'target_qps_per_replica', self.target_qps_per_replica) + add_if_not_none('replica_policy', 'autoscaler', self._autoscaler) add_if_not_none('replica_policy', 'upscale_delay_seconds', self.upscale_delay_seconds) add_if_not_none('replica_policy', 'downscale_delay_seconds', @@ -236,6 +249,10 @@ def target_qps_per_replica(self) -> Optional[float]: @property def post_data(self) -> Optional[Dict[str, Any]]: return self._post_data + + @property + def autoscaler(self) -> Optional[str]: + return self._autoscaler @property def upscale_delay_seconds(self) -> int: diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index b1f9c3d72a3..b705da120c7 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -334,6 +334,9 @@ def get_service_schema(): 'target_qps_per_replica': { 'type': 'number', }, + 'autoscaler': { + 'type': 'string', + }, 'upscale_delay_seconds': { 'type': 'number', },