Skip to content

Commit

Permalink
[BugFix][CI] Fix unittest errors caused by incorrect ray actor manage…
Browse files Browse the repository at this point in the history
…ment (#23)
  • Loading branch information
s5u13b authored Aug 30, 2024
1 parent d4cd8fa commit d05adc8
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 57 deletions.
3 changes: 3 additions & 0 deletions tests/backends/vllm/test_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, executor_class=None, *args, **kwargs):
self.executor_class = executor_class

self.output_processor = SingleStepOutputProcessor(self.scheduler.scheduler_config,detokenizer, self.scheduler, seq_counter, stop_checker)

def update_instance_info(self, instance_info):
pass


def test_llm_engine_process_model_outputs():
Expand Down
6 changes: 3 additions & 3 deletions tests/backends/vllm/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from .test_llm_engine import MockEngine
from .utils import create_dummy_prompt
from tests.utils import setup_ray_env


TEST_PROMPTS = ["hello world, ",
"Briefly describe the major milestones in the development of artificial intelligence from 1950 to 2020.\n",
Expand All @@ -46,8 +48,7 @@ def __init__(self):

@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason="Need at least 2 GPUs to run the test.")
def test_migration_correctness():
ray.init(namespace="llumnix", ignore_reinit_error=True)
def test_migration_correctness(setup_ray_env):
engine_args = EngineArgs(model="facebook/opt-125m",worker_use_ray=True)
id_rank_map = {"0":0,"1":1}
migration_config = MigrationConfig("LCFS", "gloo",16,1,4,5,20)
Expand Down Expand Up @@ -127,7 +128,6 @@ def test_correctness(prompt):
assert output.cumulative_logprob == origin_output.cumulative_logprob
for prompt in TEST_PROMPTS:
test_correctness(prompt)
ray.shutdown()

def test_clear_migration_states():
llumlet = MockLlumlet()
Expand Down
8 changes: 3 additions & 5 deletions tests/backends/vllm/test_migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from llumnix.utils import random_uuid

from tests.backends.vllm.test_worker import create_worker
from tests.utils import setup_ray_env


class MockMigrationWorker(MigrationWorker):
def set_gpu_cache(self, data):
Expand All @@ -36,9 +38,7 @@ def get_gpu_cache(self):

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
def test_migrate_cache(backend):
ray.init(namespace="llumnix", ignore_reinit_error=True)

def test_migrate_cache(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migraiton_config = EngineManagerArgs(migration_cache_blocks=3, migration_num_layers=5).create_migration_config()
migraiton_config.migration_backend = backend
Expand Down Expand Up @@ -106,5 +106,3 @@ def test_migrate_cache(backend):
for src_idx, dst_idx in src_to_dst.items():
assert torch.allclose(worker0_data[layer_idx][0][src_idx], worker1_data[layer_idx][0][dst_idx])
assert torch.allclose(worker0_data[layer_idx][1][src_idx], worker1_data[layer_idx][1][dst_idx])

ray.shutdown()
13 changes: 3 additions & 10 deletions tests/backends/vllm/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from llumnix.arg_utils import EngineManagerArgs
from llumnix.utils import random_uuid

from tests.utils import setup_ray_env

def create_worker(rank: int, local_rank: int, engine_config: EngineConfig,
worker_module_name="llumnix.backends.vllm.worker",
Expand Down Expand Up @@ -62,9 +63,7 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig,
return worker

@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
def test_reserve_memory_for_migration(backend):
ray.init(namespace="llumnix", ignore_reinit_error=True)

def test_reserve_memory_for_migration(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migraiton_config = EngineManagerArgs(migration_cache_blocks=1).create_migration_config()
migraiton_config.migration_backend = backend
Expand All @@ -83,13 +82,9 @@ def test_reserve_memory_for_migration(backend):
parallel_config=engine_config.parallel_config))
assert migration_cache_size == occupy_memory

ray.shutdown()

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
def test_rebuild_migration_backend(backend):
ray.init(namespace="llumnix", ignore_reinit_error=True)

def test_rebuild_migration_backend(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migraiton_config = EngineManagerArgs(migration_cache_blocks=1).create_migration_config()
migraiton_config.migration_backend = backend
Expand Down Expand Up @@ -135,5 +130,3 @@ def test_rebuild_migration_backend(backend):
assert ray.get(worker0.execute_method.remote('rebuild_migration_backend', instance_rank=instance_rank,
group_name=random_uuid()))
assert ray.get(worker0.execute_method.remote('warmup'))

ray.shutdown()
3 changes: 3 additions & 0 deletions tests/backends/vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def initialize_scheduler(*,
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
scheduler = SchedulerLlumnix(scheduler_config, cache_config, lora_config)
def update_instance_info(instance_info):
pass
scheduler.add_update_instance_info_callback(update_instance_info)
return scheduler

def create_dummy_prompt(
Expand Down
17 changes: 6 additions & 11 deletions tests/entrypoints/test_llumnix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
retry_manager_method_async)
from llumnix.llm_engine_manager import MANAGER_ACTOR_NAME

from tests.utils import setup_ray_env


def test_launch_ray_cluster():
ip_address = get_ip_address()
Expand All @@ -32,34 +34,27 @@ def test_launch_ray_cluster():
result = launch_ray_cluster(30050)
assert result.returncode == 0

def test_init_manager():
def test_init_manager(setup_ray_env):
engine_manager_args = EngineManagerArgs()
engine_manager = init_manager(engine_manager_args)
assert engine_manager is not None
engine_manager_actor_handle = ray.get_actor(MANAGER_ACTOR_NAME, namespace='llumnix')
assert engine_manager_actor_handle is not None
assert engine_manager == engine_manager_actor_handle
ray.kill(engine_manager)
ray.shutdown()

def test_init_request_output_queue():
def test_init_request_output_queue(setup_ray_env):
request_output_queue = init_request_output_queue()
assert request_output_queue is not None
ray.shutdown()

def test_retry_manager_method_sync():
def test_retry_manager_method_sync(setup_ray_env):
engine_manager_args = EngineManagerArgs()
engine_manager = init_manager(engine_manager_args)
ret = retry_manager_method_sync(engine_manager.is_ready.remote, 'is_ready')
assert ret is True
ray.kill(engine_manager)
ray.shutdown()

@pytest.mark.asyncio
async def test_retry_manager_method_async():
async def test_retry_manager_method_async(setup_ray_env):
engine_manager_args = EngineManagerArgs()
engine_manager = init_manager(engine_manager_args)
ret = await retry_manager_method_async(engine_manager.is_ready.remote, 'is_ready')
assert ret is True
ray.kill(engine_manager)
ray.shutdown()
2 changes: 1 addition & 1 deletion tests/entrypoints/vllm/test_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def api_server():
uvicorn_process = subprocess.Popen(commands)
yield
uvicorn_process.terminate()
ray.shutdown()
# Waiting for api server subprocess to terminate.
time.sleep(1.0)

@pytest.mark.parametrize("interface", ['generate', 'generate_benchmark'])
Expand Down
34 changes: 18 additions & 16 deletions tests/global_scheduler/test_llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import numpy as np

from vllm.utils import random_uuid
from vllm import EngineArgs

from llumnix.arg_utils import EngineManagerArgs
from llumnix.llm_engine_manager import LLMEngineManager, MANAGER_ACTOR_NAME
from llumnix.instance_info import InstanceInfo

from tests.utils import setup_ray_env


@ray.remote(num_cpus=1, max_concurrency=4)
class MockLlumlet:
Expand Down Expand Up @@ -75,7 +78,6 @@ def get_num_migrate_out(self):


def init_manager():
ray.init(ignore_reinit_error=True, namespace='llumnix')
try:
engine_manager_args = EngineManagerArgs()
engine_manager_args.log_instance_info = False
Expand Down Expand Up @@ -103,8 +105,6 @@ def engine_manager():
engine_manager = init_manager()
ray.get(engine_manager.is_ready.remote())
yield engine_manager
ray.kill(engine_manager)
ray.shutdown()

@pytest.fixture
def llumlet():
Expand All @@ -115,19 +115,25 @@ def llumlet():
ray.get(llumlet.is_ready.remote())
return llumlet

def test_init_manager(engine_manager):
def test_init_manager(setup_ray_env, engine_manager):
assert engine_manager is not None
engine_manager_actor_handle = ray.get_actor(MANAGER_ACTOR_NAME, namespace='llumnix')
assert engine_manager_actor_handle is not None
assert engine_manager == engine_manager_actor_handle

def test_init_llumlet(llumlet):
def test_init_llumlet(setup_ray_env, llumlet):
assert llumlet is not None
ray.get(llumlet.is_ready.remote())

# TODO(s5u13b): Add init_llumlets test.
def test_init_llumlets(setup_ray_env, engine_manager):
engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True)
node_id = ray.get_runtime_context().get_node_id()
instance_ids, llumlets = ray.get(engine_manager.init_llumlets.remote(engine_args, node_id))
num_instances = ray.get(engine_manager.scale_up.remote(instance_ids, llumlets))
engine_manager_args = EngineManagerArgs()
assert num_instances == engine_manager_args.initial_instances

def test_scale_up_and_down(engine_manager):
def test_scale_up_and_down(setup_ray_env, engine_manager):
initial_instances = 4
instance_ids, llumlets = init_llumlets(initial_instances)
num_instances = ray.get(engine_manager.scale_up.remote(instance_ids, llumlets))
Expand All @@ -142,7 +148,7 @@ def test_scale_up_and_down(engine_manager):
num_instances = ray.get(engine_manager.scale_down.remote(instance_ids_1))
assert num_instances == 0

def test_connect_to_instances():
def test_connect_to_instances(setup_ray_env):
initial_instances = 4
instance_ids, llumlets = init_llumlets(initial_instances)
ray.get([llumlet.is_ready.remote() for llumlet in llumlets])
Expand All @@ -152,10 +158,8 @@ def test_connect_to_instances():
assert num_instances == initial_instances * 2
num_instances = ray.get(engine_manager.scale_down.remote(instance_ids))
assert num_instances == initial_instances
ray.kill(engine_manager)
ray.shutdown()

def test_generate_and_abort(engine_manager, llumlet):
def test_generate_and_abort(setup_ray_env, engine_manager, llumlet):
instance_id = ray.get(llumlet.get_instance_id.remote())
ray.get(engine_manager.scale_up.remote(instance_id, [llumlet]))
request_id = random_uuid()
Expand All @@ -174,7 +178,7 @@ def test_generate_and_abort(engine_manager, llumlet):
num_requests = ray.get(llumlet.get_num_requests.remote())
assert num_requests == 0

def test_get_request_instance():
def test_get_request_instance(setup_ray_env):
_, llumlets = init_llumlets(2)
llumlet, llumlet_1 = llumlets[0], llumlets[1]
request_id = random_uuid()
Expand All @@ -192,8 +196,6 @@ def test_get_request_instance():
num_requests_1 = ray.get(llumlet_1.get_num_requests.remote())
assert num_requests == 0
assert num_requests_1 == 0
ray.kill(engine_manager)
ray.shutdown()

def get_instance_info_migrate_in(instance_id):
instance_info = InstanceInfo()
Expand All @@ -211,7 +213,7 @@ def get_instance_info_migrate_out(instance_id):
instance_info.num_blocks_first_waiting_request = np.inf
return instance_info

def test_update_instance_info_loop_and_migrate(engine_manager):
def test_update_instance_info_loop_and_migrate(setup_ray_env, engine_manager):
instance_ids, llumlets = init_llumlets(2)
instance_id, instance_id_1 = instance_ids[0], instance_ids[1]
llumlet, llumlet_1 = llumlets[0], llumlets[1]
Expand All @@ -226,6 +228,6 @@ def test_update_instance_info_loop_and_migrate(engine_manager):
num_migrate_out = ray.get(llumlet.get_num_migrate_out.remote())
assert num_migrate_out == 0
ray.get(engine_manager.scale_up.remote(instance_ids, llumlets))
time.sleep(0.2)
time.sleep(0.5)
num_migrate_out = ray.get(llumlet.get_num_migrate_out.remote())
assert num_migrate_out != 0
17 changes: 6 additions & 11 deletions tests/llumlet/test_migration_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
from llumnix.backends.backend_interface import BackendInterface
from llumnix.llumlet.llumlet import MigrationStatus

from tests.utils import setup_ray_env


@ray.remote
def ray_remote_call(ret):
return ret

def test_migrate_out_onestage():
# Initialize Ray
ray.init(ignore_reinit_error=True)

def test_migrate_out_onestage(setup_ray_env):
# Create mock objects
backend_engine = MagicMock(spec=BackendInterface)
migrate_in_ray_actor = MagicMock()
Expand Down Expand Up @@ -74,13 +74,10 @@ def test_migrate_out_onestage():
migrate_in_ray_actor.execute_migration_method.remote.return_value = ray_remote_call.remote(dst_blocks)
status = coordinator.migrate_out_onestage(migrate_in_ray_actor, migrate_out_request)
assert status == MigrationStatus.FINISHED_ABORTED
ray.shutdown()

# setup_ray_env should be passed after migrate_out_onestage
@patch.object(MigrationCoordinator, 'migrate_out_onestage')
def test_migrate_out_multistage(migrate_out_onestage):
# Initialize Ray
ray.init(ignore_reinit_error=True)

def test_migrate_out_multistage(migrate_out_onestage, setup_ray_env):
# Create mock objects
backend_engine = MagicMock(spec=BackendInterface)
migrate_in_ray_actor = MagicMock()
Expand All @@ -107,5 +104,3 @@ def test_migrate_out_multistage(migrate_out_onestage):
status = coordinator.migrate_out_multistage(migrate_in_ray_actor, migrate_out_request)
assert coordinator.migrate_out_onestage.call_count == max_stages + 1
assert status == MigrationStatus.FINISHED_ABORTED

ray.shutdown()
33 changes: 33 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) 2024, Alibaba Group;
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ray
import pytest


@pytest.fixture
def setup_ray_env():
ray.init(namespace="llumnix", ignore_reinit_error=True)
yield
named_actors = ray.util.list_named_actors(True)
for actor in named_actors:
try:
actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace'])
except:
continue
try:
ray.kill(actor_handle)
except:
continue
# Should to be placed after killing actors, otherwise it may occur some unexpected errors when re-init ray.
ray.shutdown()

0 comments on commit d05adc8

Please sign in to comment.