diff --git a/Makefile b/Makefile index 945991b2..6bc87a9b 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ test: check_pytest_installed @pytest -x -v --ignore=third_party/ --ignore=tests/e2e_test --disable-warnings @python examlpes/offline_inference.py @pytest -v tests/e2e_test/test_e2e.py + @pytest -v -x ./tests/e2e_test/test_migration.py .PHONY: unit_test unit_test: check_pytest_installed @@ -49,6 +50,10 @@ e2e_test: bench_test: @pytest -v ./tests/e2e_test/test_bench.py +.PHONY: migration_test +migration_test: + @pytest -v -x ./tests/e2e_test/test_migration.py + #################### pygloo install for gloo migration backend begin #################### BAZEL_CMD = bazel diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 7a972b7a..7b47728b 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -77,8 +77,6 @@ def __init__(self, self.instance_migrating: Dict[str, bool] = {} self.pending_rebuild_migration_instances = 0 self.global_scheduler = GlobalScheduler(global_scheduler_config) - # When manager starts, it automatically connects to all existing instances. - self._connect_to_instances() self.polling_interval = engine_manager_args.polling_interval asyncio.create_task(self._update_instance_info_loop(self.polling_interval)) @@ -106,6 +104,10 @@ def __init__(self, self.log_instance_info = engine_manager_args.log_instance_info if self.log_instance_info: self._init_instance_info_csv(engine_manager_args) + self.instance_last_logged_empty = {} + + # When manager starts, it automatically connects to all existing instances. + self._connect_to_instances() async def generate( self, @@ -352,6 +354,8 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles indeed_update = True self.instances[ins_id] = llumlet_actor_handles[idx] self.instance_migrating[ins_id] = False + if self.log_instance_info: + self.instance_last_logged_empty[ins_id] = False self.pending_rebuild_migration_instances += 1 self.global_scheduler.scale_up(instance_ids) self.num_instances = len(self.instances) @@ -378,6 +382,8 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_bac indeed_update = True del self.instances[ins_id] del self.instance_migrating[ins_id] + if self.log_instance_info: + del self.instance_last_logged_empty[ins_id] self.pending_rebuild_migration_instances += 1 self.global_scheduler.scale_down(instance_ids) self.num_instances = len(self.instances) @@ -521,7 +527,11 @@ def _init_instance_info_csv(self, engine_manager_args: EngineManagerArgs) -> Non def _log_instance_infos_to_csv(self, instance_infos: List[InstanceInfo]) -> None: for instance_info in instance_infos: - if instance_info.gpu_cache_usage > 0: + instance_id = instance_info.instance_id + gpu_cache_usage = instance_info.gpu_cache_usage + should_log = (gpu_cache_usage > 0) or (gpu_cache_usage == 0 and not self.instance_last_logged_empty[instance_id]) + if should_log: + self.instance_last_logged_empty[instance_id] = (gpu_cache_usage == 0) self.instance_info_csv.writerow([ instance_info.timestamp, instance_info.instance_id, diff --git a/tests/e2e_test/test_bench.py b/tests/e2e_test/test_bench.py index 7bc9d092..b6d70d8f 100644 --- a/tests/e2e_test/test_bench.py +++ b/tests/e2e_test/test_bench.py @@ -16,11 +16,10 @@ import os import subprocess import pytest -import ray import torch import numpy as np -from .test_e2e import generate_launch_command +from .test_e2e import generate_launch_command, clear_ray_state from .utils import to_markdown_table def launch_llumnix_service(command): @@ -56,22 +55,6 @@ def shutdown_llumnix_service(): except Exception: pass -def clear_ray_state(): - named_actors = ray.util.list_named_actors(True) - for actor in named_actors: - try: - actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace']) - # pylint: disable=bare-except - except: - continue - - try: - ray.kill(actor_handle) - # pylint: disable=bare-except - except: - continue - ray.shutdown() - def parse_log_file(): json_files = [f for f in os.listdir('.') if f.endswith('_latency_info.json')] diff --git a/tests/e2e_test/test_e2e.py b/tests/e2e_test/test_e2e.py index 280b8d75..42f92512 100644 --- a/tests/e2e_test/test_e2e.py +++ b/tests/e2e_test/test_e2e.py @@ -20,15 +20,38 @@ from vllm import LLM, SamplingParams + +def parse_launch_mode(launch_mode: str): + # 'eief' means that enable init instance by manager and enable fixed node init instance, and so on. + if launch_mode == 'eief': + disable_init_instance_by_manager = False + disable_fixed_node_init_instance = False + elif launch_mode == 'eidf': + disable_init_instance_by_manager = False + disable_fixed_node_init_instance = True + elif launch_mode == 'dief': + disable_init_instance_by_manager = True + disable_fixed_node_init_instance = False + else: + disable_init_instance_by_manager = True + disable_fixed_node_init_instance = True + return disable_init_instance_by_manager, disable_fixed_node_init_instance + def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1", - ip: str = "127.0.0.1", port: int = 1234, instances_num = 1, dispatch_policy: str = "load", - migration_backend = "rpc", model = "facebook/opt-125m", max_model_len: int = 2048): + ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load", + migration_backend = "gloo", model = "facebook/opt-125m", max_model_len: int = 2048, + launch_mode: str = 'eief', log_instance_info: bool = False): + disable_init_instance_by_manager, disable_fixed_node_init_instance = parse_launch_mode(launch_mode) command = ( f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " f"nohup python -m llumnix.entrypoints.vllm.api_server " f"--host {ip} " f"--port {port} " + f"{'--disable-init-instance-by-manager ' if disable_init_instance_by_manager else ''}" + f"{'--disable-fixed-node-init-instance ' if disable_fixed_node_init_instance else ''}" f"--initial-instances {instances_num} " + f"{'--log-filename manager ' if log_instance_info else ''}" + f"{'--log-instance-info ' if log_instance_info else ''}" f"--enable-migration " f"--model {model} " f"--engine-use-ray " @@ -46,9 +69,10 @@ def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool ) return command -def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str): +def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str, launch_mode: str): command = generate_launch_command(model=model, max_model_len=max_model_len, - port=port, migration_backend=migration_backend) + port=port, migration_backend=migration_backend, + launch_mode=launch_mode) subprocess.run(command, shell=True, check=True) def shutdown_llumnix_service(): @@ -110,7 +134,10 @@ def run_vllm(model, max_model_len, sampling_params): @pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) @pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) -async def test_e2e(model, migration_backend): +@pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf']) +async def test_e2e(model, migration_backend, launch_mode): + if migration_backend == 'gloo' and launch_mode != 'eief': + pytest.skip("When the migration backend is gloo, the launch mode of llumnix can only be eief") max_model_len = 370 sampling_params = { "n": 1, @@ -123,7 +150,7 @@ async def test_e2e(model, migration_backend): # generate llumnix outputs base_port = 37037 - launch_llumnix_service(model, max_model_len, base_port, migration_backend) + launch_llumnix_service(model, max_model_len, base_port, migration_backend, launch_mode) await asyncio.sleep(60) llumnix_output = {} diff --git a/tests/e2e_test/test_migration.py b/tests/e2e_test/test_migration.py index 9b3e2237..7fe167bb 100644 --- a/tests/e2e_test/test_migration.py +++ b/tests/e2e_test/test_migration.py @@ -17,6 +17,7 @@ import subprocess import pytest import torch +import pandas as pd from .test_e2e import generate_launch_command from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service @@ -25,7 +26,8 @@ size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)') speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s') -def parse_log_file(log_files): + +def parse_instance_log_file(log_files): speed_dict = defaultdict(list) for log_file in log_files: @@ -52,6 +54,14 @@ def parse_log_file(log_files): return averger_speed +def parse_manager_log_file(log_file): + df = pd.read_csv(log_file) + instance_id_set = set(df["instance_id"]) + for instance_id in instance_id_set: + df_instance = df[df["instance_id"] == instance_id] + num_available_gpu_blocks_list = df_instance["num_available_gpu_blocks"].to_numpy().tolist() + assert num_available_gpu_blocks_list[0] == num_available_gpu_blocks_list[-1] + @pytest.mark.asyncio @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) @@ -65,7 +75,8 @@ async def test_migration_benchmark(model, migration_backend): output_log = f"{base_port+i}.out" instance_output_logs.append("instance_"+output_log) launch_command = generate_launch_command(result_filename=output_log, launch_ray_cluster=False, port=base_port+i, - model=model, dispatch_policy="flood", migration_backend=migration_backend) + model=model, dispatch_policy="flood", migration_backend=migration_backend, + log_instance_info=True) subprocess.run(launch_command, shell=True, check=True) await asyncio.sleep(60) @@ -76,12 +87,15 @@ async def run_bench_command(command): for i in range(device_count//2): bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300, - dataset_type="sharegpt", - dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" , - qps=10) + dataset_type="sharegpt", + dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" , + qps=10) await asyncio.wait_for(run_bench_command(bench_command), timeout=60*30) + await asyncio.sleep(30) + + parse_manager_log_file("manager_instance.csv") - averger_speed = parse_log_file(instance_output_logs) + averger_speed = parse_instance_log_file(instance_output_logs) sorted_keys = sorted(averger_speed.keys(), key=lambda x: float(x.split()[0])) diff --git a/tools/migration_test.sh b/tools/migration_test.sh index 244ac3cd..3e13ce55 100755 --- a/tools/migration_test.sh +++ b/tools/migration_test.sh @@ -3,4 +3,4 @@ set -ex nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \ registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ - bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_migration.py" + bash -c "pip install -e . > /dev/null && make migration_test"