diff --git a/llumnix/backends/vllm/migration_backend.py b/llumnix/backends/vllm/migration_backend.py index 80823463..3cb5b950 100644 --- a/llumnix/backends/vllm/migration_backend.py +++ b/llumnix/backends/vllm/migration_backend.py @@ -221,7 +221,7 @@ def destory_backend(self) -> None: def warmup(self) -> bool: if self.global_world_size > 1: try: - col.allreduce(self.dummy_buffer[0], self.group_name) + col.allreduce(self.dummy_buffer[0][0], self.group_name) # pylint: disable=W0703 except Exception as e: logger.info("warmup migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}." diff --git a/tests/unit_test/backends/vllm/test_migration_backend.py b/tests/unit_test/backends/vllm/test_migration_backend.py index 31749016..ff1e7e7f 100644 --- a/tests/unit_test/backends/vllm/test_migration_backend.py +++ b/tests/unit_test/backends/vllm/test_migration_backend.py @@ -30,13 +30,12 @@ def get_ready_workers(num_worker, num_gpu_blocks, engine_config, migraiton_confi workers = [] worker_ids = [] - for _ in range(num_worker): + for i in range(num_worker): worker_id = random_uuid() - worker = create_worker(rank=0, local_rank=0, engine_config=engine_config, + worker = create_worker(rank=0, local_rank=i, engine_config=engine_config, worker_module_name="tests.unit_test.backends.vllm.test_migration_backend", worker_class_name="MockMigrationWorker") - ray.get(worker.execute_method.remote('init_device')) ray.get(worker.execute_method.remote('initialize_cache', num_gpu_blocks=num_gpu_blocks, num_cpu_blocks=0)) ray.get(worker.execute_method.remote( 'init_migration', @@ -76,7 +75,7 @@ def get_gpu_cache(self): torch.cuda.synchronize() return self.gpu_cache -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.") +@pytest.mark.skipif(torch.cuda.device_count() < 3, reason="Need at least 3 GPU to run the test.") @pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl']) def test_one_to_many_migrate_cache(setup_ray_env, backend): engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config() @@ -118,11 +117,11 @@ def test_one_to_many_migrate_cache(setup_ray_env, backend): dst_worker_data = ray.get(workers[worker_idx].execute_method.remote('get_gpu_cache')) for layer_idx in range(num_layers): for src_idx, dst_idx in src_to_dst.items(): - assert torch.allclose(worker0_data[layer_idx][0][src_idx], dst_worker_data[layer_idx][0][dst_idx]) - assert torch.allclose(worker0_data[layer_idx][1][src_idx], dst_worker_data[layer_idx][1][dst_idx]) + assert torch.allclose(worker0_data[layer_idx][0][src_idx].cpu(), dst_worker_data[layer_idx][0][dst_idx].cpu()) + assert torch.allclose(worker0_data[layer_idx][1][src_idx].cpu(), dst_worker_data[layer_idx][1][dst_idx].cpu()) worker_idx += 1 -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.") +@pytest.mark.skipif(torch.cuda.device_count() < 3, reason="Need at least 3 GPU to run the test.") @pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl']) def test_many_to_one_migrate_cache(setup_ray_env, backend): engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config() @@ -169,6 +168,6 @@ def test_many_to_one_migrate_cache(setup_ray_env, backend): for layer_idx in range(num_layers): for src_idx, dst_idx in src_to_dst.items(): - assert torch.allclose(worker_datas[worker_idx][layer_idx][0][src_idx], dst_worker_data[layer_idx][0][dst_idx]) - assert torch.allclose(worker_datas[worker_idx][layer_idx][1][src_idx], dst_worker_data[layer_idx][1][dst_idx]) + assert torch.allclose(worker_datas[worker_idx][layer_idx][0][src_idx].cpu(), dst_worker_data[layer_idx][0][dst_idx].cpu()) + assert torch.allclose(worker_datas[worker_idx][layer_idx][1][src_idx].cpu(), dst_worker_data[layer_idx][1][dst_idx].cpu()) worker_idx += 1 diff --git a/tests/unit_test/backends/vllm/test_worker.py b/tests/unit_test/backends/vllm/test_worker.py index 3bbd99bf..d5fdcb8b 100644 --- a/tests/unit_test/backends/vllm/test_worker.py +++ b/tests/unit_test/backends/vllm/test_worker.py @@ -38,8 +38,9 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig, worker_class_name=worker_class_name, trust_remote_code=True ) - - worker.init_worker.remote( + cuda_env = {'CUDA_VISIBLE_DEVICES': ",".join([str(i) for i in range(torch.cuda.device_count())])} + ray.get(worker.update_environment_variables.remote(cuda_env)) + ray.get(worker.init_worker.remote( model_config=engine_config.model_config, parallel_config=engine_config.parallel_config, scheduler_config=engine_config.scheduler_config, @@ -52,7 +53,8 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig, lora_config=engine_config.lora_config, vision_language_config=engine_config.vision_language_config, is_driver_worker = False - ) + )) + ray.get(worker.execute_method.remote('init_device')) return worker diff --git a/tests/unit_test/llumlet/test_engine_step_exception.py b/tests/unit_test/llumlet/test_engine_step_exception.py index b07c6b5e..c630a04f 100644 --- a/tests/unit_test/llumlet/test_engine_step_exception.py +++ b/tests/unit_test/llumlet/test_engine_step_exception.py @@ -11,7 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import time import ray import torch @@ -21,7 +20,7 @@ from vllm.engine.arg_utils import EngineArgs -from llumnix.backends.backend_interface import BackendType, EngineState +from llumnix.backends.backend_interface import BackendType from llumnix.llumlet.llumlet import Llumlet from llumnix.internal_config import MigrationConfig from llumnix.queue.queue_type import QueueType @@ -30,27 +29,12 @@ @ray.remote(num_cpus=1, max_concurrency=4) class MockLlumlet(Llumlet): - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.origin_step = self.backend_engine.engine.step_async - - def set_error_step(self, broken: bool): - self.backend_engine._stop_event.set() - - time.sleep(30) - - assert self.backend_engine.state == EngineState.STOPPED - + def set_error_step(self): async def raise_error_step(): await self.origin_step() raise ValueError("Mock engine step error") - if broken: - self.backend_engine.engine.step_async = raise_error_step - else: - self.backend_engine.engine.step_async = self.origin_step - - asyncio.create_task(self.backend_engine._start_engine_step_loop()) + self.backend_engine.engine.step_async = raise_error_step @pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Need at least 1 GPU to run the test.") def test_engine_step_exception(setup_ray_env): @@ -80,7 +64,7 @@ def test_engine_step_exception(setup_ray_env): cur_free_memory, _ = torch.cuda.mem_get_info() assert cur_free_memory < origin_free_memory - ray.get(llumlet.set_error_step.remote(True)) + ray.get(llumlet.set_error_step.remote()) time.sleep(3) all_actors = ray.util.list_named_actors(True)