Skip to content

Commit

Permalink
update autoscaler name and user interface
Browse files Browse the repository at this point in the history
  • Loading branch information
landscapepainter committed Jan 7, 2024
1 parent f24415d commit a7a2ad7
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 33 deletions.
44 changes: 29 additions & 15 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import math
import time
import typing
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Type, Union

from sky import sky_logging
from sky.serve import constants
Expand Down Expand Up @@ -58,6 +58,9 @@ def __repr__(self) -> str:
class Autoscaler:
"""Abstract class for autoscalers."""

NAME: Optional[str] = None
REGISTRY: Dict[str, Type['Autoscaler']] = dict()

def __init__(self, spec: 'service_spec.SkyServiceSpec') -> None:
"""Initialize the autoscaler.
Expand All @@ -83,21 +86,35 @@ def evaluate_scaling(
"""Evaluate autoscale options based on replica information."""
raise NotImplementedError

def __init_subclass__(cls) -> None:
if cls.NAME is None:
# This is an abstract class, don't put it in the registry.
return
assert cls.NAME not in cls.REGISTRY, f'Name {cls.NAME} already exists'
cls.REGISTRY[cls.NAME] = cls

@classmethod
def get_autoscaler_names(cls) -> List[str]:
return list(cls.REGISTRY.keys())

@classmethod
def from_spec(cls, spec: 'service_spec.SkyServiceSpec') -> 'Autoscaler':
assert (spec.autoscaler is not None and spec.autoscaler in cls.REGISTRY)
return cls.REGISTRY[spec.autoscaler](spec)

class RequestRateAutoscaler(Autoscaler):
"""RequestRateAutoscaler: Autoscale according to request rate.
Scales when the number of requests in the given interval is above or below
the threshold.
"""
NAME: Optional[str] = 'RequestRateAutoscaler'

def __init__(self, spec: 'service_spec.SkyServiceSpec',
qps_window_size: int) -> None:
def __init__(self, spec: 'service_spec.SkyServiceSpec') -> None:
"""Initialize the request rate autoscaler.
Variables:
target_qps_per_replica: Target qps per replica for autoscaling.
qps_window_size: Window size for qps calculating.
request_timestamps: All request timestamps within the window.
upscale_counter: counter for upscale number of replicas.
downscale_counter: counter for downscale number of replicas.
Expand All @@ -107,7 +124,6 @@ def __init__(self, spec: 'service_spec.SkyServiceSpec',
super().__init__(spec)
self.target_qps_per_replica: Optional[
float] = spec.target_qps_per_replica
self.qps_window_size: int = qps_window_size
self.request_timestamps: List[float] = []
self.upscale_counter: int = 0
self.downscale_counter: int = 0
Expand Down Expand Up @@ -136,7 +152,7 @@ def collect_request_information(
request_aggregator_info.get('timestamps', []))
current_time = time.time()
index = bisect.bisect_left(self.request_timestamps,
current_time - self.qps_window_size)
current_time - constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS)
self.request_timestamps = self.request_timestamps[index:]

def _get_desired_num_replicas(self) -> int:
Expand All @@ -148,7 +164,7 @@ def _get_desired_num_replicas(self) -> int:

# Convert to requests per second.
num_requests_per_second = len(
self.request_timestamps) / self.qps_window_size
self.request_timestamps) / constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS
target_num_replicas = math.ceil(num_requests_per_second /
self.target_qps_per_replica)
target_num_replicas = max(self.min_replicas,
Expand Down Expand Up @@ -242,17 +258,17 @@ def _get_replica_ids_to_scale_down(num_limit: int) -> List[int]:
return scaling_options


class HeteroGPUAutoscaler(Autoscaler):
class HeteroAccelAutoscaler(Autoscaler):
"""RequestRateAutoscaler: Autoscale according to request rate.
Scales when the number of requests in the given interval is above or below
the threshold.
"""
NAME: Optional[str] = 'HeteroAccelAutoscaler'

SCALE_UP_COOL_DOWN_INTERVAL_SECONDS = 300

def __init__(self, spec: 'service_spec.SkyServiceSpec',
frequency: int, rps_window_size: int) -> None:
def __init__(self, spec: 'service_spec.SkyServiceSpec',) -> None:
"""Initialize the request rate autoscaler.
Variables:
Expand All @@ -266,16 +282,14 @@ def __init__(self, spec: 'service_spec.SkyServiceSpec',
"""
super().__init__(spec)
self.rps_window_size: int = self.SCALE_UP_COOL_DOWN_INTERVAL_SECONDS
self.frequency = frequency
self.frequency = constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS
self.last_scale_operation: float = 0.
self.request_timestamps: List[float] = []
self.request_timestamps_distribution: List[List[float]] = [[], [], [],
[], [], [],
[]]
self.request_distribution: List[int] = [0, 0, 0, 0, 0, 0, 0]
self.request_rate_dist: List[float] = [0, 0, 0, 0, 0, 0, 0]
self.total_request_in_window: int = 0
self.last_scale_time: float = 0.
self.scale_down_candidates: List['replica_managers.ReplicaInfo'] = []

def collect_request_information(
Expand Down Expand Up @@ -441,11 +455,11 @@ def evaluate_scaling(
List[AutoscalerDecision]]]) = []

# Return if the cool down interval has not passed.
if (time.time()- self.last_scale_time <
if (time.time()- self.last_scale_operation <
self.SCALE_UP_COOL_DOWN_INTERVAL_SECONDS):
return scaling_decisions

self.last_scale_time = time.time()
self.last_scale_operation = time.time()
launched_replica_infos = [
info for info in replica_infos if info.is_launched
]
Expand Down
3 changes: 3 additions & 0 deletions sky/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
# TODO(tian): We might need to be careful that service logs can take a lot of
# disk space. Maybe we could use a larger disk size, migrate to cloud storage or
# do some log rotation.
# Default autoscaler
DEFAULT_AUTOSCALER = 'RequestRateAutoscaler'

CONTROLLER_RESOURCES = {'cpus': '4+', 'disk_size': 200}

# A default controller with 4 vCPU and 16 GB memory can run up to 16 services.
Expand Down
21 changes: 11 additions & 10 deletions sky/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
import time
import traceback
from typing import List
from typing import Any, Dict, List

import fastapi
import uvicorn
Expand Down Expand Up @@ -47,10 +47,7 @@ def __init__(self, service_name: str, service_spec: serve.SkyServiceSpec,
spec=service_spec,
task_yaml_path=task_yaml))
self._autoscaler: autoscalers.Autoscaler = (
autoscalers.HeteroGPUAutoscaler(
service_spec,
frequency=constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS,
rps_window_size=constants.AUTOSCALER_QPS_WINDOW_SIZE_SECONDS))
autoscalers.Autoscaler.from_spec(service_spec))
self._port = port
self._app = fastapi.FastAPI()

Expand Down Expand Up @@ -92,8 +89,8 @@ def _run_autoscaler(self):
logger.error(f' Traceback: {traceback.format_exc()}')
time.sleep(constants.AUTOSCALER_DEFAULT_DECISION_INTERVAL_SECONDS)

def _run_heteroGPU_autoscaler(self):
logger.info('Starting autoscaler.')
def _run_HeteroAccel_autoscaler(self):
logger.info('Starting HeteroAccel Autoscaler.')
while True:
try:
# If the primary replica successfully launched to ReplicaStatus.READY,
Expand Down Expand Up @@ -170,7 +167,8 @@ def run(self) -> None:
@self._app.post('/controller/load_balancer_sync')
async def load_balancer_sync(request: fastapi.Request):
request_data = await request.json()
request_aggregator = request_data.get('request_aggregator')
request_aggregator: Dict[str, Any] = request_data.get(
'request_aggregator', {})
logger.info(
f'Received inflight request information: {request_aggregator}')
self._autoscaler.collect_request_information(request_aggregator)
Expand All @@ -185,8 +183,11 @@ def configure_logger():
for handler in uvicorn_access_logger.handlers:
handler.setFormatter(sky_logging.FORMATTER)

#threading.Thread(target=self._run_autoscaler).start()
threading.Thread(target=self._run_heteroGPU_autoscaler).start()
if (isinstance(self._autoscaler,
autoscalers.HeteroAccelAutoscaler)):
threading.Thread(target=self._run_HeteroAccel_autoscaler).start()
else:
threading.Thread(target=self._run_autoscaler).start()

logger.info('SkyServe Controller started on '
f'http://localhost:{self._port}')
Expand Down
6 changes: 3 additions & 3 deletions sky/serve/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def configure_logger():
uvicorn.run(self._app, host='0.0.0.0', port=self._load_balancer_port)


class HeteroGPULoadBalancer(SkyServeLoadBalancer):
class HeteroAccelLoadBalancer(SkyServeLoadBalancer):
"""SkyServeLoadBalancer: redirect incoming traffic.
This class accept any traffic to the controller and redirect it
Expand All @@ -133,7 +133,7 @@ def __init__(self, controller_url: str, load_balancer_port: int) -> None:
self._load_balancing_policy: lb_policies.LoadBalancingPolicy = (
lb_policies.RoundRobinPolicy())
self._request_aggregator: serve_utils.RequestsAggregator = (
serve_utils.RequestHeteroGPU())
serve_utils.RequestHeteroAccel())

async def _redirect_handler(self, request: fastapi.Request):
body_bytes = await request.body()
Expand Down Expand Up @@ -164,7 +164,7 @@ async def _redirect_handler(self, request: fastapi.Request):
def run_load_balancer(controller_addr: str, load_balancer_port: int):
#load_balancer = SkyServeLoadBalancer(controller_url=controller_addr,
# load_balancer_port=load_balancer_port)
load_balancer = HeteroGPULoadBalancer(controller_url=controller_addr,
load_balancer = HeteroAccelLoadBalancer(controller_url=controller_addr,
load_balancer_port=load_balancer_port)
logger.info(f'run_load_balancer(load_balancer_port): {load_balancer_port}')
load_balancer.run()
2 changes: 1 addition & 1 deletion sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def _launch_replica(
self._service_name, replica_id)
log_file_name = serve_utils.generate_replica_launch_log_file_name(
self._service_name, replica_id)
### Testing for heteroGPU
### Testing for HeteroAccel
max_retry = 30
p = multiprocessing.Process(
target=ux_utils.RedirectOutputForProcess(
Expand Down
4 changes: 2 additions & 2 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ def __repr__(self) -> str:
return f'RequestTimestamp(timestamps={self.timestamps})'


class RequestHeteroGPU(RequestsAggregator):
class RequestHeteroAccel(RequestsAggregator):
"""RequestTimestamp: Aggregates request timestamps.
This is useful for HeteroGPU-autoscaling.
This is useful for HeteroAccel-autoscaling.
"""

def __init__(self) -> None:
Expand Down
7 changes: 5 additions & 2 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
# Generate load balancer log file name.
load_balancer_log_file = os.path.expanduser(
serve_utils.generate_remote_load_balancer_log_file_name(service_name))

# Generate controller log file name.
controller_log_file = os.path.expanduser(
serve_utils.generate_remote_controller_log_file_name(service_name))

controller_process = None
load_balancer_process = None
Expand All @@ -177,8 +181,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
#controller_process = multiprocessing.Process(
# target=controller.run_controller,
# args=(service_name, service_spec, task_yaml, controller_port))
controller_log_file = os.path.expanduser(
'~/.sky/serve/test_serve/controller.log')

controller_process = multiprocessing.Process(
target=ux_utils.RedirectOutputForProcess(
controller.run_controller, controller_log_file).run,
Expand Down
17 changes: 17 additions & 0 deletions sky/serve/service_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import yaml

from sky.serve import autoscalers
from sky.serve import constants
from sky.utils import common_utils
from sky.utils import schemas
Expand All @@ -25,6 +26,7 @@ def __init__(
post_data: Optional[Dict[str, Any]] = None,
upscale_delay_seconds: Optional[int] = None,
downscale_delay_seconds: Optional[int] = None,
autoscaler: Optional[str] = None,
# The following arguments are deprecated.
# TODO(ziming): remove this after 2 minor release, i.e. 0.6.0.
# Deprecated: Always be True
Expand Down Expand Up @@ -60,12 +62,20 @@ def __init__(
'Currently, SkyServe will cleanup failed replicas'
'and auto restart it to keep the service running.')

if autoscaler is None:
autoscaler = constants.DEFAULT_AUTOSCALER

if autoscaler not in autoscalers.Autoscaler.get_autoscaler_names():
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Unsupported autoscaler: {autoscaler}.')

self._readiness_path = readiness_path
self._initial_delay_seconds = initial_delay_seconds
self._min_replicas = min_replicas
self._max_replicas = max_replicas
self._target_qps_per_replica = target_qps_per_replica
self._post_data = post_data
self._autoscaler: str = autoscaler

self._upscale_delay_seconds = (
upscale_delay_seconds if upscale_delay_seconds is not None else
Expand Down Expand Up @@ -138,6 +148,8 @@ def from_yaml_config(config: Dict[str, Any]) -> 'SkyServiceSpec':
'upscale_delay_seconds', None)
service_config['downscale_delay_seconds'] = policy_section.get(
'downscale_delay_seconds', None)
service_config['autoscaler'] = policy_section.get(
'autoscaler', None)

return SkyServiceSpec(**service_config)

Expand Down Expand Up @@ -183,6 +195,7 @@ def add_if_not_none(section, key, value, no_empty: bool = False):
add_if_not_none('replica_policy', 'max_replicas', self.max_replicas)
add_if_not_none('replica_policy', 'target_qps_per_replica',
self.target_qps_per_replica)
add_if_not_none('replica_policy', 'autoscaler', self._autoscaler)
add_if_not_none('replica_policy', 'upscale_delay_seconds',
self.upscale_delay_seconds)
add_if_not_none('replica_policy', 'downscale_delay_seconds',
Expand Down Expand Up @@ -236,6 +249,10 @@ def target_qps_per_replica(self) -> Optional[float]:
@property
def post_data(self) -> Optional[Dict[str, Any]]:
return self._post_data

@property
def autoscaler(self) -> Optional[str]:
return self._autoscaler

@property
def upscale_delay_seconds(self) -> int:
Expand Down
3 changes: 3 additions & 0 deletions sky/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ def get_service_schema():
'target_qps_per_replica': {
'type': 'number',
},
'autoscaler': {
'type': 'string',
},
'upscale_delay_seconds': {
'type': 'number',
},
Expand Down

0 comments on commit a7a2ad7

Please sign in to comment.