diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 7b47728b..d55b4005 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -346,27 +346,32 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles instance_id = [instance_id,] instance_ids = list(instance_id) - indeed_update = False no_pending_instance = (self.pending_rebuild_migration_instances == 0) + indeed_update = False - for idx, ins_id in enumerate(instance_ids): + new_instance_ids = [] + for ins_id in instance_ids: if ins_id not in self.instances: + new_instance_ids.append(ins_id) indeed_update = True - self.instances[ins_id] = llumlet_actor_handles[idx] - self.instance_migrating[ins_id] = False - if self.log_instance_info: - self.instance_last_logged_empty[ins_id] = False self.pending_rebuild_migration_instances += 1 - self.global_scheduler.scale_up(instance_ids) - self.num_instances = len(self.instances) # When scaling up, we need to rebuild the migration backend. But if initially self.pending_rebuild_migration_instances != 0, # a coroutine is already handling the changes in the number of instances in the cluster and it will account for the changes # caused by this scale-up (see rebuild_migrate_backend for details). Therefore, we simply return in this case. Specifically, # for RPC, the Ray actor handle is used for the migration cache, so there is no need to rebuild the group. - if self.engine_manager_args.migration_backend != 'rpc' and indeed_update and no_pending_instance: + if self.enable_migration and self.engine_manager_args.migration_backend != 'rpc' \ + and indeed_update and no_pending_instance: asyncio.create_task(self.rebuild_migrate_backend()) + for idx, ins_id in enumerate(new_instance_ids): + self.instances[ins_id] = llumlet_actor_handles[idx] + self.instance_migrating[ins_id] = False + if self.log_instance_info: + self.instance_last_logged_empty[ins_id] = False + self.global_scheduler.scale_up(new_instance_ids) + self.num_instances = len(self.instances) + return self.num_instances def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_backend: bool = True) -> None: @@ -388,7 +393,7 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_bac self.global_scheduler.scale_down(instance_ids) self.num_instances = len(self.instances) - if self.engine_manager_args.migration_backend != 'rpc': + if self.enable_migration and self.engine_manager_args.migration_backend != 'rpc': if len(self.instances) == 0: self.pending_rebuild_migration_instances = 0