Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyi-ECNU committed Sep 23, 2024
1 parent add0c7b commit 749a93f
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def _push_migrations(self) -> None:
else:
asyncio.create_task(self._migrate(PairMigrationConstraints.NO_CONSTRAINTS, 1))

async def _migrate(self) -> None:
async def _migrate(self, pair_migration_type:str, migrate_in_num_requests:int) -> None:
async def migrate_done_callback(ret, migrate_instance_pair: Tuple[str, str]) -> None:
if migrate_instance_pair[0] in self.instance_migrating:
self.instance_migrating[migrate_instance_pair[0]] = False
Expand Down Expand Up @@ -252,8 +252,7 @@ def migrate_done_callback_wrapper(migrate_instance_pair: Tuple[str, str], fut) -
ret = fut.result()
loop = asyncio.get_event_loop()
loop.create_task(migrate_done_callback(ret, migrate_instance_pair))

migrate_instance_pairs = self.global_scheduler.pair_migration()
migrate_instance_pairs = self.global_scheduler.pair_migration(pair_migration_type)
try:
migration_tasks = []
for _, migrate_instance_pair in enumerate(migrate_instance_pairs):
Expand All @@ -263,9 +262,8 @@ def migrate_done_callback_wrapper(migrate_instance_pair: Tuple[str, str], fut) -
self.instance_migrating[migrate_out_instance_id] = True
self.instance_migrating[migrate_in_instance_id] = True
migrate_in_instance_name = "instance_{}".format(migrate_in_instance_id)
logger.info("{}->{} begin migrate out".format(migrate_out_instance_id, migrate_in_instance_id))
# Use asyncio.gather to wrap ray remote call to add done callback.
task = asyncio.gather(self.instances[migrate_out_instance_id].migrate_out.remote(migrate_in_instance_name),
task = asyncio.gather(self.instances[migrate_out_instance_id].migrate_out.remote(migrate_in_instance_name, migrate_in_num_requests),
return_exceptions=True)
task.add_done_callback(partial(migrate_done_callback_wrapper, migrate_instance_pair))
migration_tasks.append(task)
Expand Down

0 comments on commit 749a93f

Please sign in to comment.