Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
KuilongCui committed Nov 5, 2024
1 parent 188b08e commit e0b0836
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,27 +346,32 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles
instance_id = [instance_id,]
instance_ids = list(instance_id)

indeed_update = False
no_pending_instance = (self.pending_rebuild_migration_instances == 0)
indeed_update = False

for idx, ins_id in enumerate(instance_ids):
new_instance_ids = []
for ins_id in instance_ids:
if ins_id not in self.instances:
new_instance_ids.append(ins_id)
indeed_update = True
self.instances[ins_id] = llumlet_actor_handles[idx]
self.instance_migrating[ins_id] = False
if self.log_instance_info:
self.instance_last_logged_empty[ins_id] = False
self.pending_rebuild_migration_instances += 1
self.global_scheduler.scale_up(instance_ids)
self.num_instances = len(self.instances)

# When scaling up, we need to rebuild the migration backend. But if initially self.pending_rebuild_migration_instances != 0,
# a coroutine is already handling the changes in the number of instances in the cluster and it will account for the changes
# caused by this scale-up (see rebuild_migrate_backend for details). Therefore, we simply return in this case. Specifically,
# for RPC, the Ray actor handle is used for the migration cache, so there is no need to rebuild the group.
if self.engine_manager_args.migration_backend != 'rpc' and indeed_update and no_pending_instance:
if self.enable_migration and self.engine_manager_args.migration_backend != 'rpc' \
and indeed_update and no_pending_instance:
asyncio.create_task(self.rebuild_migrate_backend())

for idx, ins_id in enumerate(new_instance_ids):
self.instances[ins_id] = llumlet_actor_handles[idx]
self.instance_migrating[ins_id] = False
if self.log_instance_info:
self.instance_last_logged_empty[ins_id] = False
self.global_scheduler.scale_up(new_instance_ids)
self.num_instances = len(self.instances)

return self.num_instances

def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_backend: bool = True) -> None:
Expand All @@ -388,7 +393,7 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_bac
self.global_scheduler.scale_down(instance_ids)
self.num_instances = len(self.instances)

if self.engine_manager_args.migration_backend != 'rpc':
if self.enable_migration and self.engine_manager_args.migration_backend != 'rpc':
if len(self.instances) == 0:
self.pending_rebuild_migration_instances = 0

Expand Down

0 comments on commit e0b0836

Please sign in to comment.