Skip to content

Commit

Permalink
[CI] Add launch modes and available blocks tests in e2e test (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b authored Oct 17, 2024
1 parent dc98279 commit 8d32ee9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 34 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ test: check_pytest_installed
@pytest -x -v --ignore=third_party/ --ignore=tests/e2e_test --disable-warnings
@python examlpes/offline_inference.py
@pytest -v tests/e2e_test/test_e2e.py
@pytest -v -x ./tests/e2e_test/test_migration.py

.PHONY: unit_test
unit_test: check_pytest_installed
Expand All @@ -49,6 +50,10 @@ e2e_test:
bench_test:
@pytest -v ./tests/e2e_test/test_bench.py

.PHONY: migration_test
migration_test:
@pytest -v -x ./tests/e2e_test/test_migration.py

#################### pygloo install for gloo migration backend begin ####################

BAZEL_CMD = bazel
Expand Down
16 changes: 13 additions & 3 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ def __init__(self,
self.instance_migrating: Dict[str, bool] = {}
self.pending_rebuild_migration_instances = 0
self.global_scheduler = GlobalScheduler(global_scheduler_config)
# When manager starts, it automatically connects to all existing instances.
self._connect_to_instances()

self.polling_interval = engine_manager_args.polling_interval
asyncio.create_task(self._update_instance_info_loop(self.polling_interval))
Expand Down Expand Up @@ -106,6 +104,10 @@ def __init__(self,
self.log_instance_info = engine_manager_args.log_instance_info
if self.log_instance_info:
self._init_instance_info_csv(engine_manager_args)
self.instance_last_logged_empty = {}

# When manager starts, it automatically connects to all existing instances.
self._connect_to_instances()

async def generate(
self,
Expand Down Expand Up @@ -352,6 +354,8 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles
indeed_update = True
self.instances[ins_id] = llumlet_actor_handles[idx]
self.instance_migrating[ins_id] = False
if self.log_instance_info:
self.instance_last_logged_empty[ins_id] = False
self.pending_rebuild_migration_instances += 1
self.global_scheduler.scale_up(instance_ids)
self.num_instances = len(self.instances)
Expand All @@ -378,6 +382,8 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_bac
indeed_update = True
del self.instances[ins_id]
del self.instance_migrating[ins_id]
if self.log_instance_info:
del self.instance_last_logged_empty[ins_id]
self.pending_rebuild_migration_instances += 1
self.global_scheduler.scale_down(instance_ids)
self.num_instances = len(self.instances)
Expand Down Expand Up @@ -521,7 +527,11 @@ def _init_instance_info_csv(self, engine_manager_args: EngineManagerArgs) -> Non

def _log_instance_infos_to_csv(self, instance_infos: List[InstanceInfo]) -> None:
for instance_info in instance_infos:
if instance_info.gpu_cache_usage > 0:
instance_id = instance_info.instance_id
gpu_cache_usage = instance_info.gpu_cache_usage
should_log = (gpu_cache_usage > 0) or (gpu_cache_usage == 0 and not self.instance_last_logged_empty[instance_id])
if should_log:
self.instance_last_logged_empty[instance_id] = (gpu_cache_usage == 0)
self.instance_info_csv.writerow([
instance_info.timestamp,
instance_info.instance_id,
Expand Down
19 changes: 1 addition & 18 deletions tests/e2e_test/test_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
import os
import subprocess
import pytest
import ray
import torch
import numpy as np

from .test_e2e import generate_launch_command
from .test_e2e import generate_launch_command, clear_ray_state
from .utils import to_markdown_table

def launch_llumnix_service(command):
Expand Down Expand Up @@ -56,22 +55,6 @@ def shutdown_llumnix_service():
except Exception:
pass

def clear_ray_state():
named_actors = ray.util.list_named_actors(True)
for actor in named_actors:
try:
actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace'])
# pylint: disable=bare-except
except:
continue

try:
ray.kill(actor_handle)
# pylint: disable=bare-except
except:
continue
ray.shutdown()

def parse_log_file():
json_files = [f for f in os.listdir('.') if f.endswith('_latency_info.json')]

Expand Down
39 changes: 33 additions & 6 deletions tests/e2e_test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,38 @@

from vllm import LLM, SamplingParams


def parse_launch_mode(launch_mode: str):
# 'eief' means that enable init instance by manager and enable fixed node init instance, and so on.
if launch_mode == 'eief':
disable_init_instance_by_manager = False
disable_fixed_node_init_instance = False
elif launch_mode == 'eidf':
disable_init_instance_by_manager = False
disable_fixed_node_init_instance = True
elif launch_mode == 'dief':
disable_init_instance_by_manager = True
disable_fixed_node_init_instance = False
else:
disable_init_instance_by_manager = True
disable_fixed_node_init_instance = True
return disable_init_instance_by_manager, disable_fixed_node_init_instance

def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1",
ip: str = "127.0.0.1", port: int = 1234, instances_num = 1, dispatch_policy: str = "load",
migration_backend = "rpc", model = "facebook/opt-125m", max_model_len: int = 2048):
ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load",
migration_backend = "gloo", model = "facebook/opt-125m", max_model_len: int = 2048,
launch_mode: str = 'eief', log_instance_info: bool = False):
disable_init_instance_by_manager, disable_fixed_node_init_instance = parse_launch_mode(launch_mode)
command = (
f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 "
f"nohup python -m llumnix.entrypoints.vllm.api_server "
f"--host {ip} "
f"--port {port} "
f"{'--disable-init-instance-by-manager ' if disable_init_instance_by_manager else ''}"
f"{'--disable-fixed-node-init-instance ' if disable_fixed_node_init_instance else ''}"
f"--initial-instances {instances_num} "
f"{'--log-filename manager ' if log_instance_info else ''}"
f"{'--log-instance-info ' if log_instance_info else ''}"
f"--enable-migration "
f"--model {model} "
f"--engine-use-ray "
Expand All @@ -46,9 +69,10 @@ def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool
)
return command

def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str):
def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str, launch_mode: str):
command = generate_launch_command(model=model, max_model_len=max_model_len,
port=port, migration_backend=migration_backend)
port=port, migration_backend=migration_backend,
launch_mode=launch_mode)
subprocess.run(command, shell=True, check=True)

def shutdown_llumnix_service():
Expand Down Expand Up @@ -110,7 +134,10 @@ def run_vllm(model, max_model_len, sampling_params):
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl'])
async def test_e2e(model, migration_backend):
@pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf'])
async def test_e2e(model, migration_backend, launch_mode):
if migration_backend == 'gloo' and launch_mode != 'eief':
pytest.skip("When the migration backend is gloo, the launch mode of llumnix can only be eief")
max_model_len = 370
sampling_params = {
"n": 1,
Expand All @@ -123,7 +150,7 @@ async def test_e2e(model, migration_backend):

# generate llumnix outputs
base_port = 37037
launch_llumnix_service(model, max_model_len, base_port, migration_backend)
launch_llumnix_service(model, max_model_len, base_port, migration_backend, launch_mode)
await asyncio.sleep(60)

llumnix_output = {}
Expand Down
26 changes: 20 additions & 6 deletions tests/e2e_test/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import pytest
import torch
import pandas as pd

from .test_e2e import generate_launch_command
from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service
Expand All @@ -25,7 +26,8 @@
size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)')
speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s')

def parse_log_file(log_files):

def parse_instance_log_file(log_files):
speed_dict = defaultdict(list)

for log_file in log_files:
Expand All @@ -52,6 +54,14 @@ def parse_log_file(log_files):

return averger_speed

def parse_manager_log_file(log_file):
df = pd.read_csv(log_file)
instance_id_set = set(df["instance_id"])
for instance_id in instance_id_set:
df_instance = df[df["instance_id"] == instance_id]
num_available_gpu_blocks_list = df_instance["num_available_gpu_blocks"].to_numpy().tolist()
assert num_available_gpu_blocks_list[0] == num_available_gpu_blocks_list[-1]

@pytest.mark.asyncio
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
Expand All @@ -65,7 +75,8 @@ async def test_migration_benchmark(model, migration_backend):
output_log = f"{base_port+i}.out"
instance_output_logs.append("instance_"+output_log)
launch_command = generate_launch_command(result_filename=output_log, launch_ray_cluster=False, port=base_port+i,
model=model, dispatch_policy="flood", migration_backend=migration_backend)
model=model, dispatch_policy="flood", migration_backend=migration_backend,
log_instance_info=True)
subprocess.run(launch_command, shell=True, check=True)
await asyncio.sleep(60)

Expand All @@ -76,12 +87,15 @@ async def run_bench_command(command):

for i in range(device_count//2):
bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300,
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=10)
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=10)
await asyncio.wait_for(run_bench_command(bench_command), timeout=60*30)
await asyncio.sleep(30)

parse_manager_log_file("manager_instance.csv")

averger_speed = parse_log_file(instance_output_logs)
averger_speed = parse_instance_log_file(instance_output_logs)

sorted_keys = sorted(averger_speed.keys(), key=lambda x: float(x.split()[0]))

Expand Down
2 changes: 1 addition & 1 deletion tools/migration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -ex

nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \
registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \
bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_migration.py"
bash -c "pip install -e . > /dev/null && make migration_test"

0 comments on commit 8d32ee9

Please sign in to comment.