diff --git a/llumnix/backends/vllm/migrate_backend.py b/llumnix/backends/vllm/migrate_backend.py index a39e7209..4beb79b5 100644 --- a/llumnix/backends/vllm/migrate_backend.py +++ b/llumnix/backends/vllm/migrate_backend.py @@ -176,6 +176,8 @@ def init_col(self, name, world_size, rank) -> None: self.ray_world_size = world_size self.ray_rank = rank + logger.info("begin to init ray collective group (group_name:{}, world_size: {}, rank: {}, backbend: {})." + .format(self.group_name, self.ray_world_size, self.ray_rank, self.backend)) col.init_collective_group(world_size=self.ray_world_size, rank=self.ray_rank, backend=self.backend, group_name=self.group_name) @@ -197,6 +199,7 @@ def destory_col(self) -> None: col.destroy_collective_group(self.group_name) logger.info("destory ray collective group success (group_name:{}, backbend: {})." .format(self.group_name, self.backend)) + self.group_name = None def migrate_cache(self, src_handle, src_blocks: List[int], dst_blocks: List[int]) -> None: tot_blocks = len(src_blocks) diff --git a/llumnix/backends/vllm/worker.py b/llumnix/backends/vllm/worker.py index 1521d239..4771663b 100644 --- a/llumnix/backends/vllm/worker.py +++ b/llumnix/backends/vllm/worker.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Dict, List import math import os @@ -101,6 +102,7 @@ def init_migration(self, instance_id: str, migration_config: MigrationConfig, sr logger.warning("Detecting numpy unsupported dtype: {}. Using torch.float32.".format(self.cache_engine.dtype)) self.instance_id = instance_id + self.migration_config = migration_config num_instance = len(migration_config.instance_rank_map) self.ray_world_size = num_instance * self.parallel_config.world_size self.ray_rank = self.rank + migration_config.instance_rank_map[self.instance_id] * self.parallel_config.world_size @@ -118,8 +120,14 @@ def init_migration(self, instance_id: str, migration_config: MigrationConfig, sr def migrate_cache(self, src_worker_handle_list, src_blocks: List[int], dst_blocks: List[int]): try: + t0 = time.time() src_worker_handle = src_worker_handle_list[self.rank] self.migrate_backend.migrate_cache(src_worker_handle, src_blocks, dst_blocks) + t1 = time.time() + migrate_block_size = self.cache_engine.block_size * self.cache_engine.num_heads * self.cache_engine.head_size + data_size = self.cache_engine.num_layers * 2 * len(src_blocks) * migrate_block_size * 2.0 / 1024 / 1024 / 1024 + logger.info(f"[Ray {self.migration_config.migration_backend}] data size: {data_size * 1024}MB, " + f"e2e_speed: {(data_size/(t1-t0)):.2f}GB/s, all_time: {t1-t0}") except ray.exceptions.RayActorError: logger.info("[migrate_cache] self.rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle)) @@ -138,6 +146,7 @@ def rebuild_migrate_backend(self, id_rank_map: Dict[str, int], group_name: str) def warmup(self): self.migrate_backend.warmup() + logger.info("migrate backend warmup done.") def shutdown(self) -> None: # self.migrate_backend.destory_col() diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index 4e2a7a9b..0d37aac3 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -249,9 +249,9 @@ def set_runtime_env(manager_args: EngineManagerArgs): else: # Connect to a ray cluster. head_node_ip = os.getenv('HEAD_NODE_IP') + ray.init(address=f"{head_node_ip}:{args.ray_cluster_port}", ignore_reinit_error=True, namespace="llumnix") - # if gpu is not available, it means that this node is head pod without any llumnix components if is_gpu_available(): # Launch the Llumnix componets on current node. diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 0032e635..9248c24e 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -281,6 +281,8 @@ async def run_task(alive_instances: List[str], task: str, *args, **kwargs) -> Li pending_task = self.pending_rebuild_migrate_instances while len(alive_instances) > 0 and self.pending_rebuild_migrate_instances > 0: + logger.info(f"rebuild migrate backend doing, pending_rebuild_migrate_instances: {self.pending_rebuild_migrate_instances}") + group_name = random_uuid() id_rank_map = {instance_id: index for index, instance_id in enumerate(alive_instances)} @@ -296,7 +298,7 @@ async def run_task(alive_instances: List[str], task: str, *args, **kwargs) -> Li pending_task = self.pending_rebuild_migrate_instances if len(alive_instances) > 0: - logger.info(f"rebuild migrate backend done, group_name: {group_name}, alive instance: {alive_instances}") + logger.info(f"rebuild migrate backend done, group_name: {group_name}, alive instance ({len(alive_instances)}): {alive_instances}") # restore migrate config self.enable_migrate = origin_config @@ -319,6 +321,7 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles # When scaling up, we need to rebuild the migration backend. But if self.pending_rebuild_migrate_instances > 1, # a coroutine is already handling the membership change. And the coroutine will account for the membership changes # caused by this scale-up (see rebuild_migrate_backend for details). Therefore, we simply return in this case. + logger.info(f"current pending_rebuild_migrate_instances: {self.pending_rebuild_migrate_instances}") if indeed_update and self.engine_manager_args.migration_backend != "rpc" and \ self.pending_rebuild_migrate_instances == 1: asyncio.create_task(self.rebuild_migrate_backend()) @@ -376,10 +379,11 @@ def from_args(cls, global_scheduler_config = engine_manager_args.create_engine_manager_configs() # Init manager actor in 'llumnix' namespace to ensure that only one manager can be created. manager_class = ray.remote(num_cpus=0, - max_restarts=-1, + # max_restarts=-1, name=MANAGER_ACTOR_NAME, namespace='llumnix', - lifetime="detached")(cls) + # lifetime="detached" + )(cls) engine_manager = manager_class.remote(engine_manager_args, global_scheduler_config, os.getcwd(),