diff --git a/tests/e2e_test/test_e2e.py b/tests/e2e_test/test_e2e.py index 20e5c431..42f92512 100644 --- a/tests/e2e_test/test_e2e.py +++ b/tests/e2e_test/test_e2e.py @@ -20,103 +20,53 @@ from vllm import LLM, SamplingParams -def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1", - ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load", - migration_backend = "gloo", model = "facebook/opt-125m", max_model_len: int = 2048, - launch_mode: str = 'eief'): + +def parse_launch_mode(launch_mode: str): # 'eief' means that enable init instance by manager and enable fixed node init instance, and so on. if launch_mode == 'eief': - command = ( - f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " - f"nohup python -m llumnix.entrypoints.vllm.api_server " - f"--host {ip} " - f"--port {port} " - f"--initial-instances {instances_num} " - f"--enable-migration " - f"--model {model} " - f"--engine-use-ray " - f"--worker-use-ray " - f"--max-model-len {max_model_len} " - f"--dispatch-policy {dispatch_policy} " - f"--trust-remote-code " - f"--request-migration-policy LCFS " - f"--migration-backend {migration_backend} " - f"--migration-cache-blocks 32 " - f"--tensor-parallel-size 1 " - f"--request-output-queue-port {1234+port} " - f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" - f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" - ) + disable_init_instance_by_manager = False + disable_fixed_node_init_instance = False elif launch_mode == 'eidf': - command = ( - f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " - f"nohup python -m llumnix.entrypoints.vllm.api_server " - f"--host {ip} " - f"--port {port} " - f"--disable-fixed-node-init-instance " - f"--initial-instances {instances_num} " - f"--enable-migration " - f"--model {model} " - f"--engine-use-ray " - f"--worker-use-ray " - f"--max-model-len {max_model_len} " - f"--dispatch-policy {dispatch_policy} " - f"--trust-remote-code " - f"--request-migration-policy LCFS " - f"--migration-backend {migration_backend} " - f"--migration-cache-blocks 32 " - f"--tensor-parallel-size 1 " - f"--request-output-queue-port {1234+port} " - f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" - f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" - ) + disable_init_instance_by_manager = False + disable_fixed_node_init_instance = True elif launch_mode == 'dief': - command = ( - f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " - f"nohup python -m llumnix.entrypoints.vllm.api_server " - f"--host {ip} " - f"--port {port} " - f"--disable-init-instance-by-manager " - f"--initial-instances {instances_num} " - f"--enable-migration " - f"--model {model} " - f"--engine-use-ray " - f"--worker-use-ray " - f"--max-model-len {max_model_len} " - f"--dispatch-policy {dispatch_policy} " - f"--trust-remote-code " - f"--request-migration-policy LCFS " - f"--migration-backend {migration_backend} " - f"--migration-cache-blocks 32 " - f"--tensor-parallel-size 1 " - f"--request-output-queue-port {1234+port} " - f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" - f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" - ) - else: # launch_mode == 'didf': - command = ( - f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " - f"nohup python -m llumnix.entrypoints.vllm.api_server " - f"--host {ip} " - f"--port {port} " - f"--disable-init-instance-by-manager " - f"--disable-fixed-node-init-instance " - f"--initial-instances {instances_num} " - f"--enable-migration " - f"--model {model} " - f"--engine-use-ray " - f"--worker-use-ray " - f"--max-model-len {max_model_len} " - f"--dispatch-policy {dispatch_policy} " - f"--trust-remote-code " - f"--request-migration-policy LCFS " - f"--migration-backend {migration_backend} " - f"--migration-cache-blocks 32 " - f"--tensor-parallel-size 1 " - f"--request-output-queue-port {1234+port} " - f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" - f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" - ) + disable_init_instance_by_manager = True + disable_fixed_node_init_instance = False + else: + disable_init_instance_by_manager = True + disable_fixed_node_init_instance = True + return disable_init_instance_by_manager, disable_fixed_node_init_instance + +def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1", + ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load", + migration_backend = "gloo", model = "facebook/opt-125m", max_model_len: int = 2048, + launch_mode: str = 'eief', log_instance_info: bool = False): + disable_init_instance_by_manager, disable_fixed_node_init_instance = parse_launch_mode(launch_mode) + command = ( + f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " + f"nohup python -m llumnix.entrypoints.vllm.api_server " + f"--host {ip} " + f"--port {port} " + f"{'--disable-init-instance-by-manager ' if disable_init_instance_by_manager else ''}" + f"{'--disable-fixed-node-init-instance ' if disable_fixed_node_init_instance else ''}" + f"--initial-instances {instances_num} " + f"{'--log-filename manager ' if log_instance_info else ''}" + f"{'--log-instance-info ' if log_instance_info else ''}" + f"--enable-migration " + f"--model {model} " + f"--engine-use-ray " + f"--worker-use-ray " + f"--max-model-len {max_model_len} " + f"--dispatch-policy {dispatch_policy} " + f"--trust-remote-code " + f"--request-migration-policy LCFS " + f"--migration-backend {migration_backend} " + f"--migration-cache-blocks 32 " + f"--tensor-parallel-size 1 " + f"--request-output-queue-port {1234+port} " + f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" + f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" + ) return command def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str, launch_mode: str): diff --git a/tests/e2e_test/test_migration.py b/tests/e2e_test/test_migration.py index 50f993d2..7fe167bb 100644 --- a/tests/e2e_test/test_migration.py +++ b/tests/e2e_test/test_migration.py @@ -19,39 +19,13 @@ import torch import pandas as pd +from .test_e2e import generate_launch_command from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service from .utils import to_markdown_table size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)') speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s') -def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1", - ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load", - migration_backend = "rpc", model = "facebook/opt-125m", max_model_len: int = 2048): - command = ( - f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " - f"nohup python -m llumnix.entrypoints.vllm.api_server " - f"--host {ip} " - f"--port {port} " - f"--initial-instances {instances_num} " - f"--log-filename manager " - f"--log-instance-info " - f"--enable-migration " - f"--model {model} " - f"--engine-use-ray " - f"--worker-use-ray " - f"--max-model-len {max_model_len} " - f"--dispatch-policy {dispatch_policy} " - f"--trust-remote-code " - f"--request-migration-policy LCFS " - f"--migration-backend {migration_backend} " - f"--migration-cache-blocks 32 " - f"--tensor-parallel-size 1 " - f"--request-output-queue-port {1234+port} " - f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" - f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" - ) - return command def parse_instance_log_file(log_files): speed_dict = defaultdict(list) @@ -101,7 +75,8 @@ async def test_migration_benchmark(model, migration_backend): output_log = f"{base_port+i}.out" instance_output_logs.append("instance_"+output_log) launch_command = generate_launch_command(result_filename=output_log, launch_ray_cluster=False, port=base_port+i, - model=model, dispatch_policy="flood", migration_backend=migration_backend) + model=model, dispatch_policy="flood", migration_backend=migration_backend, + log_instance_info=True) subprocess.run(launch_command, shell=True, check=True) await asyncio.sleep(60)