Skip to content

Commit

Permalink
add safety net
Browse files Browse the repository at this point in the history
  • Loading branch information
MaoZiming committed Dec 13, 2023
1 parent 8aa6357 commit 5877001
Showing 1 changed file with 58 additions and 26 deletions.
84 changes: 58 additions & 26 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
# TODO(tian): Expose this to config.
_UPSCALE_DELAY_S = 300
_DOWNSCALE_DELAY_S = 1200
_DEFAULT_SLO_THRESHOLD = 0.99
_DEFAULT_SLO_COUNT_START = 100


class AutoscalerDecisionOperator(enum.Enum):
Expand Down Expand Up @@ -290,15 +292,16 @@ def evaluate_scaling(
f'Downscale counter: {self.downscale_counter}/'
f'{self.scale_down_consecutive_periods}')

num_on_demand = 0
num_alive_on_demand = 0
for info in alive_replica_infos:
if info.is_spot:
assert False, ('OnDemandRateAutoscaler',
'should not have spot instances.')
else:
num_on_demand += 1
num_alive_on_demand += 1

logger.info(f'Number of alive on-demand instances: {num_on_demand}')
logger.info(
f'Number of alive on-demand instances: {num_alive_on_demand}')

scaling_options = []
all_replica_ids_to_scale_down: List[int] = []
Expand Down Expand Up @@ -326,24 +329,24 @@ def _get_replica_ids_to_scale_down(
if self.overprovision:
num_to_provision += num_extra

if num_on_demand < num_to_provision:
num_on_demand_to_scale_up = num_to_provision - num_on_demand
if num_alive_on_demand < num_to_provision:
num_demand_to_scale_up = num_to_provision - num_alive_on_demand

for _ in range(num_on_demand_to_scale_up):
for _ in range(num_demand_to_scale_up):
scaling_options.append(
AutoscalerDecision(
AutoscalerDecisionOperator.SCALE_UP,
target=self._get_on_demand_resources_override_dict()))

elif num_on_demand > num_to_provision:
elif num_alive_on_demand > num_to_provision:

num_on_demand_to_scale_down = num_on_demand - num_to_provision
num_demand_to_scale_down = num_alive_on_demand - num_to_provision
all_replica_ids_to_scale_down.extend(
_get_replica_ids_to_scale_down(
info_filter=lambda info: not info.is_spot,
status_order=serve_state.ReplicaStatus.
scale_down_decision_order(),
num_limit=num_on_demand_to_scale_down,
num_limit=num_demand_to_scale_down,
))

for replica_id in all_replica_ids_to_scale_down:
Expand All @@ -369,7 +372,8 @@ def __init__(self,
cooldown: int,
rps_window_size: int,
overprovision: bool = False,
static_spot_provision: bool = False) -> None:
static_spot_provision: bool = False,
use_safety_net: bool = False) -> None:
super().__init__(spec, frequency, cooldown, rps_window_size)
assert (spec.spot_placer is not None and spec.spot_mixer is not None and
spec.spot_zones is not None and spec.num_extra is not None and
Expand All @@ -390,6 +394,9 @@ def __init__(self,
self.frequency)
self.static_spot_provision = static_spot_provision
self.overprovision = overprovision
self.use_safety_net = use_safety_net
self.meet_safety_net_count = 0
self.miss_safety_net_count = 0

def _get_spot_resources_override_dict(self) -> Dict[str, Any]:
return {'use_spot': True, 'spot_recovery': None}
Expand Down Expand Up @@ -449,17 +456,22 @@ def evaluate_scaling(
f'Downscale counter: {self.downscale_counter}/'
f'{self.scale_down_consecutive_periods}')

num_alive_spot, num_ready_spot, num_on_demand = 0, 0, 0
num_alive_spot, num_ready_spot = 0, 0
num_alive_on_demand, num_ready_on_demand = 0, 0
for info in alive_replica_infos:
if info.is_spot:
if info.status == serve_state.ReplicaStatus.READY:
num_ready_spot += 1
num_alive_spot += 1
else:
num_on_demand += 1
logger.info(f'Number of alive spot instances: {num_alive_spot}, '
f'Number of ready spot instances: {num_ready_spot}, '
f'Number of alive on-demand instances: {num_on_demand}')
if info.status == serve_state.ReplicaStatus.READY:
num_ready_on_demand += 1
num_alive_on_demand += 1
logger.info(
f'Number of alive spot instances: {num_alive_spot}, '
f'Number of ready spot instances: {num_ready_spot}, '
f'Number of alive on-demand instances: {num_alive_on_demand}, '
f'Number of ready on-demand instances: {num_ready_on_demand}')

if isinstance(self.spot_placer, spot_policy.HistoricalSpotPlacer):
log_zone_to_type = {
Expand Down Expand Up @@ -517,31 +529,51 @@ def _get_replica_ids_to_scale_down(

# OnDemand fallback.
if not self.static_spot_provision:
if num_ready_spot + num_on_demand < num_to_provision:

if self.use_safety_net:
if num_ready_spot + num_ready_on_demand >= num_to_provision:
self.meet_safety_net_count += 1
else:
self.miss_safety_net_count += 1

num_demand_to_scale_up, num_demand_to_scale_down = 0, 0
if (self.use_safety_net and self.meet_safety_net_count /
(self.meet_safety_net_count + self.miss_safety_net_count) <
_DEFAULT_SLO_THRESHOLD and
self.meet_safety_net_count + self.miss_safety_net_count >
_DEFAULT_SLO_COUNT_START):
# Enable OnDemand fallback.
num_demand_to_scale_up = (self.target_num_replicas -
num_alive_on_demand)

elif num_ready_spot + num_alive_on_demand < num_to_provision:
# Enable OnDemand fallback.
num_on_demand_to_scale_up = min(
num_demand_to_scale_up = min(
self.target_num_replicas,
num_to_provision - num_ready_spot) - num_on_demand
for _ in range(num_on_demand_to_scale_up):
num_to_provision - num_ready_spot) - num_alive_on_demand

elif num_ready_spot + num_alive_on_demand > num_to_provision:
# OnDemand fallback is not needed.
num_demand_to_scale_down = (num_ready_spot +
num_alive_on_demand -
num_to_provision)

if num_demand_to_scale_up > 0:
for _ in range(num_demand_to_scale_up):
scaling_options.append(
AutoscalerDecision(
AutoscalerDecisionOperator.SCALE_UP,
target=self._get_on_demand_resources_override_dict(
)))
elif num_ready_spot + num_on_demand > num_to_provision:
# OnDemand fallback is not needed.
num_on_demand_to_scale_down = (num_ready_spot + num_on_demand -
num_to_provision)
elif num_demand_to_scale_down > 0:
all_replica_ids_to_scale_down.extend(
_get_replica_ids_to_scale_down(
info_filter=lambda info: not info.is_spot,
status_order=serve_state.ReplicaStatus.
scale_down_decision_order(),
num_limit=num_on_demand_to_scale_down,
num_limit=num_demand_to_scale_down,
))

# TODO(tian): Safety Net.

for replica_id in all_replica_ids_to_scale_down:
scaling_options.append(
AutoscalerDecision(AutoscalerDecisionOperator.SCALE_DOWN,
Expand Down

0 comments on commit 5877001

Please sign in to comment.