Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
KuilongCui committed Nov 6, 2024
1 parent d91085a commit bd57e3d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 33 deletions.
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def destory_backend(self) -> None:
def warmup(self) -> bool:
if self.global_world_size > 1:
try:
col.allreduce(self.dummy_buffer[0], self.group_name)
col.allreduce(self.dummy_buffer[0][0], self.group_name)
# pylint: disable=W0703
except Exception as e:
logger.info("warmup migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}."
Expand Down
17 changes: 8 additions & 9 deletions tests/unit_test/backends/vllm/test_migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ def get_ready_workers(num_worker, num_gpu_blocks, engine_config, migraiton_confi
workers = []
worker_ids = []

for _ in range(num_worker):
for i in range(num_worker):
worker_id = random_uuid()
worker = create_worker(rank=0, local_rank=0, engine_config=engine_config,
worker = create_worker(rank=0, local_rank=i, engine_config=engine_config,
worker_module_name="tests.unit_test.backends.vllm.test_migration_backend",
worker_class_name="MockMigrationWorker")

ray.get(worker.execute_method.remote('init_device'))
ray.get(worker.execute_method.remote('initialize_cache', num_gpu_blocks=num_gpu_blocks, num_cpu_blocks=0))
ray.get(worker.execute_method.remote(
'init_migration',
Expand Down Expand Up @@ -76,7 +75,7 @@ def get_gpu_cache(self):
torch.cuda.synchronize()
return self.gpu_cache

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.skipif(torch.cuda.device_count() < 3, reason="Need at least 3 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
def test_one_to_many_migrate_cache(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
Expand Down Expand Up @@ -118,11 +117,11 @@ def test_one_to_many_migrate_cache(setup_ray_env, backend):
dst_worker_data = ray.get(workers[worker_idx].execute_method.remote('get_gpu_cache'))
for layer_idx in range(num_layers):
for src_idx, dst_idx in src_to_dst.items():
assert torch.allclose(worker0_data[layer_idx][0][src_idx], dst_worker_data[layer_idx][0][dst_idx])
assert torch.allclose(worker0_data[layer_idx][1][src_idx], dst_worker_data[layer_idx][1][dst_idx])
assert torch.allclose(worker0_data[layer_idx][0][src_idx].cpu(), dst_worker_data[layer_idx][0][dst_idx].cpu())
assert torch.allclose(worker0_data[layer_idx][1][src_idx].cpu(), dst_worker_data[layer_idx][1][dst_idx].cpu())
worker_idx += 1

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.skipif(torch.cuda.device_count() < 3, reason="Need at least 3 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
def test_many_to_one_migrate_cache(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
Expand Down Expand Up @@ -169,6 +168,6 @@ def test_many_to_one_migrate_cache(setup_ray_env, backend):

for layer_idx in range(num_layers):
for src_idx, dst_idx in src_to_dst.items():
assert torch.allclose(worker_datas[worker_idx][layer_idx][0][src_idx], dst_worker_data[layer_idx][0][dst_idx])
assert torch.allclose(worker_datas[worker_idx][layer_idx][1][src_idx], dst_worker_data[layer_idx][1][dst_idx])
assert torch.allclose(worker_datas[worker_idx][layer_idx][0][src_idx].cpu(), dst_worker_data[layer_idx][0][dst_idx].cpu())
assert torch.allclose(worker_datas[worker_idx][layer_idx][1][src_idx].cpu(), dst_worker_data[layer_idx][1][dst_idx].cpu())
worker_idx += 1
8 changes: 5 additions & 3 deletions tests/unit_test/backends/vllm/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig,
worker_class_name=worker_class_name,
trust_remote_code=True
)

worker.init_worker.remote(
cuda_env = {'CUDA_VISIBLE_DEVICES': ",".join([str(i) for i in range(torch.cuda.device_count())])}
ray.get(worker.update_environment_variables.remote(cuda_env))
ray.get(worker.init_worker.remote(
model_config=engine_config.model_config,
parallel_config=engine_config.parallel_config,
scheduler_config=engine_config.scheduler_config,
Expand All @@ -52,7 +53,8 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig,
lora_config=engine_config.lora_config,
vision_language_config=engine_config.vision_language_config,
is_driver_worker = False
)
))
ray.get(worker.execute_method.remote('init_device'))

return worker

Expand Down
24 changes: 4 additions & 20 deletions tests/unit_test/llumlet/test_engine_step_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import time
import ray
import torch
Expand All @@ -21,7 +20,7 @@

from vllm.engine.arg_utils import EngineArgs

from llumnix.backends.backend_interface import BackendType, EngineState
from llumnix.backends.backend_interface import BackendType
from llumnix.llumlet.llumlet import Llumlet
from llumnix.internal_config import MigrationConfig
from llumnix.queue.queue_type import QueueType
Expand All @@ -30,27 +29,12 @@

@ray.remote(num_cpus=1, max_concurrency=4)
class MockLlumlet(Llumlet):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.origin_step = self.backend_engine.engine.step_async

def set_error_step(self, broken: bool):
self.backend_engine._stop_event.set()

time.sleep(30)

assert self.backend_engine.state == EngineState.STOPPED

def set_error_step(self):
async def raise_error_step():
await self.origin_step()
raise ValueError("Mock engine step error")

if broken:
self.backend_engine.engine.step_async = raise_error_step
else:
self.backend_engine.engine.step_async = self.origin_step

asyncio.create_task(self.backend_engine._start_engine_step_loop())
self.backend_engine.engine.step_async = raise_error_step

@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Need at least 1 GPU to run the test.")
def test_engine_step_exception(setup_ray_env):
Expand Down Expand Up @@ -80,7 +64,7 @@ def test_engine_step_exception(setup_ray_env):
cur_free_memory, _ = torch.cuda.mem_get_info()
assert cur_free_memory < origin_free_memory

ray.get(llumlet.set_error_step.remote(True))
ray.get(llumlet.set_error_step.remote())
time.sleep(3)

all_actors = ray.util.list_named_actors(True)
Expand Down

0 comments on commit bd57e3d

Please sign in to comment.